CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-apache-airflow-providers-mysql

Apache Airflow provider package enabling MySQL database integration with hooks, operators, and transfer functionality for data workflows.

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

data-transfer-operations.mddocs/

Data Transfer Operations

Transfer operators for moving data from various source systems into MySQL tables. These operators provide high-level task definitions for Airflow DAGs, supporting bulk loading, transformation options, and integration with multiple data sources.

Capabilities

S3 to MySQL Transfer

Transfer data from Amazon S3 files directly into MySQL tables using bulk loading operations.

class S3ToMySqlOperator(BaseOperator):
    """
    Load a file from S3 into a MySQL table.
    
    Template fields: s3_source_key, mysql_table
    """
    
    def __init__(
        self,
        s3_source_key: str,
        mysql_table: str,
        mysql_duplicate_key_handling: str = "IGNORE",
        mysql_extra_options: str = None,
        aws_conn_id: str = "aws_default",
        mysql_conn_id: str = "mysql_default",
        mysql_local_infile: bool = False,
        **kwargs
    ):
        """
        Initialize S3 to MySQL transfer operator.
        
        Parameters:
        - s3_source_key: S3 key path to source file (templated)
        - mysql_table: Target MySQL table name (templated)
        - mysql_duplicate_key_handling: Handle duplicates ("IGNORE" or "REPLACE")
        - mysql_extra_options: Additional MySQL LOAD DATA options
        - aws_conn_id: S3 connection ID for credentials
        - mysql_conn_id: MySQL connection ID
        - mysql_local_infile: Enable local_infile option on MySQLHook
        """
    
    def execute(self, context: Context) -> None:
        """
        Execute S3 to MySQL data transfer.
        
        Downloads file from S3 and loads into MySQL using bulk operations.
        """

Vertica to MySQL Transfer

Transfer data from Vertica databases to MySQL with support for both bulk loading and regular insert operations.

class VerticaToMySqlOperator(BaseOperator):
    """
    Move data from Vertica to MySQL.
    
    Template fields: sql, mysql_table, mysql_preoperator, mysql_postoperator
    """
    
    def __init__(
        self,
        sql: str,
        mysql_table: str,
        vertica_conn_id: str = "vertica_default",
        mysql_conn_id: str = "mysql_default",
        mysql_preoperator: str = None,
        mysql_postoperator: str = None,
        bulk_load: bool = False,
        **kwargs
    ):
        """
        Initialize Vertica to MySQL transfer operator.
        
        Parameters:
        - sql: SQL query to execute against Vertica database (templated)
        - mysql_table: Target MySQL table, supports dot notation (templated)
        - vertica_conn_id: Source Vertica connection ID
        - mysql_conn_id: Target MySQL connection ID
        - mysql_preoperator: SQL statement to run before import (templated)
        - mysql_postoperator: SQL statement to run after import (templated)
        - bulk_load: Use LOAD DATA LOCAL INFILE for bulk operations
        """
    
    def execute(self, context: Context):
        """
        Execute Vertica to MySQL data transfer.
        
        Supports both bulk load (via temporary files) and regular insert operations.
        """

Presto to MySQL Transfer

Transfer data from Presto queries to MySQL tables using in-memory operations for small to medium datasets.

class PrestoToMySqlOperator(BaseOperator):
    """
    Move data from Presto to MySQL.
    
    Note: Data is loaded into memory, suitable for small amounts of data.
    Template fields: sql, mysql_table, mysql_preoperator
    """
    
    def __init__(
        self,
        sql: str,
        mysql_table: str,
        presto_conn_id: str = "presto_default",
        mysql_conn_id: str = "mysql_default",
        mysql_preoperator: str = None,
        **kwargs
    ):
        """
        Initialize Presto to MySQL transfer operator.
        
        Parameters:
        - sql: SQL query to execute against Presto (templated)
        - mysql_table: Target MySQL table, supports dot notation (templated)
        - presto_conn_id: Source Presto connection ID
        - mysql_conn_id: Target MySQL connection ID
        - mysql_preoperator: SQL statement to run before import (templated)
        """
    
    def execute(self, context: Context) -> None:
        """
        Execute Presto to MySQL data transfer.
        
        Loads query results into memory before inserting into MySQL.
        """

Trino to MySQL Transfer

Transfer data from Trino queries to MySQL tables using in-memory operations for small to medium datasets.

class TrinoToMySqlOperator(BaseOperator):
    """
    Move data from Trino to MySQL.
    
    Note: Data is loaded into memory, suitable for small amounts of data.
    Template fields: sql, mysql_table, mysql_preoperator
    """
    
    def __init__(
        self,
        sql: str,
        mysql_table: str,
        trino_conn_id: str = "trino_default",
        mysql_conn_id: str = "mysql_default",
        mysql_preoperator: str = None,
        **kwargs
    ):
        """
        Initialize Trino to MySQL transfer operator.
        
        Parameters:
        - sql: SQL query to execute against Trino (templated)
        - mysql_table: Target MySQL table, supports dot notation (templated)  
        - trino_conn_id: Source Trino connection ID
        - mysql_conn_id: Target MySQL connection ID
        - mysql_preoperator: SQL statement to run before import (templated)
        """
    
    def execute(self, context: Context) -> None:
        """
        Execute Trino to MySQL data transfer.
        
        Loads query results into memory before inserting into MySQL.
        """

Usage Examples

S3 to MySQL Transfer

from airflow import DAG
from airflow.providers.mysql.transfers.s3_to_mysql import S3ToMySqlOperator
from datetime import datetime

dag = DAG('s3_mysql_transfer', start_date=datetime(2024, 1, 1))

# Basic S3 to MySQL transfer
s3_to_mysql = S3ToMySqlOperator(
    task_id='load_users_from_s3',
    s3_source_key='data/users/{{ ds }}/users.csv',
    mysql_table='staging.users',
    mysql_duplicate_key_handling='REPLACE',
    mysql_extra_options='FIELDS TERMINATED BY "," ENCLOSED BY "\""',
    aws_conn_id='aws_default',
    mysql_conn_id='mysql_default',
    mysql_local_infile=True,
    dag=dag
)

Vertica to MySQL Transfer

from airflow.providers.mysql.transfers.vertica_to_mysql import VerticaToMySqlOperator

# Bulk load transfer from Vertica
vertica_to_mysql_bulk = VerticaToMySqlOperator(
    task_id='transfer_vertica_bulk',
    sql='''
        SELECT user_id, username, email, created_date
        FROM users 
        WHERE created_date >= '{{ ds }}'
    ''',
    mysql_table='staging.users',
    mysql_preoperator='TRUNCATE TABLE staging.users',
    mysql_postoperator='CALL update_user_stats()',
    bulk_load=True,
    vertica_conn_id='vertica_default',
    mysql_conn_id='mysql_default',
    dag=dag
)

# Regular insert transfer
vertica_to_mysql_insert = VerticaToMySqlOperator(
    task_id='transfer_vertica_insert',
    sql='SELECT * FROM daily_metrics WHERE date = %s',
    mysql_table='analytics.daily_metrics',
    bulk_load=False,
    dag=dag
)

Presto to MySQL Transfer

from airflow.providers.mysql.transfers.presto_to_mysql import PrestoToMySqlOperator

# Transfer aggregated data from Presto
presto_to_mysql = PrestoToMySqlOperator(
    task_id='load_presto_aggregates',
    sql='''
        SELECT 
            date_trunc('day', event_time) as event_date,
            event_type,
            count(*) as event_count
        FROM events 
        WHERE event_time >= date('{{ ds }}')
        GROUP BY 1, 2
    ''',
    mysql_table='analytics.event_summary',
    mysql_preoperator='DELETE FROM analytics.event_summary WHERE event_date = "{{ ds }}"',
    presto_conn_id='presto_default',
    mysql_conn_id='mysql_default',
    dag=dag
)

Trino to MySQL Transfer

from airflow.providers.mysql.transfers.trino_to_mysql import TrinoToMySqlOperator

# Transfer processed data from Trino
trino_to_mysql = TrinoToMySqlOperator(
    task_id='load_trino_results', 
    sql='''
        SELECT 
            customer_id,
            product_category,
            sum(purchase_amount) as total_spent
        FROM purchases
        WHERE purchase_date = date('{{ ds }}')
        GROUP BY customer_id, product_category
    ''',
    mysql_table='customer_analytics.daily_spending',
    mysql_preoperator='''
        CREATE TABLE IF NOT EXISTS customer_analytics.daily_spending (
            customer_id INT,
            product_category VARCHAR(100),
            total_spent DECIMAL(10,2),
            load_date DATE DEFAULT CURDATE()
        )
    ''',
    trino_conn_id='trino_default',
    mysql_conn_id='mysql_default',
    dag=dag
)

Transfer Operation Patterns

Template Variables

All transfer operators support Airflow templating for dynamic values:

# Template variables available in sql, mysql_table, and preoperator fields
sql_with_templates = '''
    SELECT * FROM events 
    WHERE event_date = '{{ ds }}'          # Current DAG run date
    AND event_time >= '{{ ts }}'           # Current DAG run timestamp
    AND user_id IN {{ params.user_ids }}   # Custom parameters
'''

mysql_table_template = 'staging.events_{{ ds_nodash }}'  # Table with date suffix

Error Handling and Retries

# Configure retry behavior for transfer operations
transfer_operator = S3ToMySqlOperator(
    task_id='s3_transfer',
    s3_source_key='data/file.csv',
    mysql_table='staging.data',
    retries=3,
    retry_delay=timedelta(minutes=5),
    dag=dag
)

Data Quality Checks

from airflow.operators.python import PythonOperator

def validate_transfer_results():
    hook = MySqlHook(mysql_conn_id='mysql_default')
    count = hook.get_first('SELECT COUNT(*) FROM staging.users')[0]
    if count == 0:
        raise ValueError("No data transferred")
    return count

# Add validation after transfer
validate_task = PythonOperator(
    task_id='validate_transfer',
    python_callable=validate_transfer_results,
    dag=dag
)

s3_to_mysql >> validate_task

Type Definitions

# Base operator context for all transfer operations
Context = Dict[str, Any]

# Transfer operation configuration
TransferConfig = {
    "source_conn_id": str,       # Source system connection ID
    "mysql_conn_id": str,        # MySQL connection ID (default: "mysql_default")
    "mysql_table": str,          # Target table (supports database.table notation)
    "mysql_preoperator": str,    # SQL to run before transfer (optional)
    "mysql_postoperator": str,   # SQL to run after transfer (optional) 
}

# S3 specific configuration
S3TransferConfig = {
    "s3_source_key": str,                    # S3 object key (templated)
    "aws_conn_id": str,                      # AWS connection (default: "aws_default")
    "mysql_duplicate_key_handling": str,     # "IGNORE" or "REPLACE"
    "mysql_extra_options": str,              # Additional LOAD DATA options
    "mysql_local_infile": bool               # Enable local_infile feature
}

# Bulk load configuration for Vertica transfers
BulkLoadConfig = {
    "bulk_load": bool,           # Enable bulk loading via temporary files
    "tmp_file_path": str,        # Temporary file location for bulk operations
}

Install with Tessl CLI

npx tessl i tessl/pypi-apache-airflow-providers-mysql

docs

asset-uri-handling.md

data-transfer-operations.md

database-operations.md

index.md

tile.json