Apache Airflow provider package enabling MySQL database integration with hooks, operators, and transfer functionality for data workflows.
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Transfer operators for moving data from various source systems into MySQL tables. These operators provide high-level task definitions for Airflow DAGs, supporting bulk loading, transformation options, and integration with multiple data sources.
Transfer data from Amazon S3 files directly into MySQL tables using bulk loading operations.
class S3ToMySqlOperator(BaseOperator):
"""
Load a file from S3 into a MySQL table.
Template fields: s3_source_key, mysql_table
"""
def __init__(
self,
s3_source_key: str,
mysql_table: str,
mysql_duplicate_key_handling: str = "IGNORE",
mysql_extra_options: str = None,
aws_conn_id: str = "aws_default",
mysql_conn_id: str = "mysql_default",
mysql_local_infile: bool = False,
**kwargs
):
"""
Initialize S3 to MySQL transfer operator.
Parameters:
- s3_source_key: S3 key path to source file (templated)
- mysql_table: Target MySQL table name (templated)
- mysql_duplicate_key_handling: Handle duplicates ("IGNORE" or "REPLACE")
- mysql_extra_options: Additional MySQL LOAD DATA options
- aws_conn_id: S3 connection ID for credentials
- mysql_conn_id: MySQL connection ID
- mysql_local_infile: Enable local_infile option on MySQLHook
"""
def execute(self, context: Context) -> None:
"""
Execute S3 to MySQL data transfer.
Downloads file from S3 and loads into MySQL using bulk operations.
"""Transfer data from Vertica databases to MySQL with support for both bulk loading and regular insert operations.
class VerticaToMySqlOperator(BaseOperator):
"""
Move data from Vertica to MySQL.
Template fields: sql, mysql_table, mysql_preoperator, mysql_postoperator
"""
def __init__(
self,
sql: str,
mysql_table: str,
vertica_conn_id: str = "vertica_default",
mysql_conn_id: str = "mysql_default",
mysql_preoperator: str = None,
mysql_postoperator: str = None,
bulk_load: bool = False,
**kwargs
):
"""
Initialize Vertica to MySQL transfer operator.
Parameters:
- sql: SQL query to execute against Vertica database (templated)
- mysql_table: Target MySQL table, supports dot notation (templated)
- vertica_conn_id: Source Vertica connection ID
- mysql_conn_id: Target MySQL connection ID
- mysql_preoperator: SQL statement to run before import (templated)
- mysql_postoperator: SQL statement to run after import (templated)
- bulk_load: Use LOAD DATA LOCAL INFILE for bulk operations
"""
def execute(self, context: Context):
"""
Execute Vertica to MySQL data transfer.
Supports both bulk load (via temporary files) and regular insert operations.
"""Transfer data from Presto queries to MySQL tables using in-memory operations for small to medium datasets.
class PrestoToMySqlOperator(BaseOperator):
"""
Move data from Presto to MySQL.
Note: Data is loaded into memory, suitable for small amounts of data.
Template fields: sql, mysql_table, mysql_preoperator
"""
def __init__(
self,
sql: str,
mysql_table: str,
presto_conn_id: str = "presto_default",
mysql_conn_id: str = "mysql_default",
mysql_preoperator: str = None,
**kwargs
):
"""
Initialize Presto to MySQL transfer operator.
Parameters:
- sql: SQL query to execute against Presto (templated)
- mysql_table: Target MySQL table, supports dot notation (templated)
- presto_conn_id: Source Presto connection ID
- mysql_conn_id: Target MySQL connection ID
- mysql_preoperator: SQL statement to run before import (templated)
"""
def execute(self, context: Context) -> None:
"""
Execute Presto to MySQL data transfer.
Loads query results into memory before inserting into MySQL.
"""Transfer data from Trino queries to MySQL tables using in-memory operations for small to medium datasets.
class TrinoToMySqlOperator(BaseOperator):
"""
Move data from Trino to MySQL.
Note: Data is loaded into memory, suitable for small amounts of data.
Template fields: sql, mysql_table, mysql_preoperator
"""
def __init__(
self,
sql: str,
mysql_table: str,
trino_conn_id: str = "trino_default",
mysql_conn_id: str = "mysql_default",
mysql_preoperator: str = None,
**kwargs
):
"""
Initialize Trino to MySQL transfer operator.
Parameters:
- sql: SQL query to execute against Trino (templated)
- mysql_table: Target MySQL table, supports dot notation (templated)
- trino_conn_id: Source Trino connection ID
- mysql_conn_id: Target MySQL connection ID
- mysql_preoperator: SQL statement to run before import (templated)
"""
def execute(self, context: Context) -> None:
"""
Execute Trino to MySQL data transfer.
Loads query results into memory before inserting into MySQL.
"""from airflow import DAG
from airflow.providers.mysql.transfers.s3_to_mysql import S3ToMySqlOperator
from datetime import datetime
dag = DAG('s3_mysql_transfer', start_date=datetime(2024, 1, 1))
# Basic S3 to MySQL transfer
s3_to_mysql = S3ToMySqlOperator(
task_id='load_users_from_s3',
s3_source_key='data/users/{{ ds }}/users.csv',
mysql_table='staging.users',
mysql_duplicate_key_handling='REPLACE',
mysql_extra_options='FIELDS TERMINATED BY "," ENCLOSED BY "\""',
aws_conn_id='aws_default',
mysql_conn_id='mysql_default',
mysql_local_infile=True,
dag=dag
)from airflow.providers.mysql.transfers.vertica_to_mysql import VerticaToMySqlOperator
# Bulk load transfer from Vertica
vertica_to_mysql_bulk = VerticaToMySqlOperator(
task_id='transfer_vertica_bulk',
sql='''
SELECT user_id, username, email, created_date
FROM users
WHERE created_date >= '{{ ds }}'
''',
mysql_table='staging.users',
mysql_preoperator='TRUNCATE TABLE staging.users',
mysql_postoperator='CALL update_user_stats()',
bulk_load=True,
vertica_conn_id='vertica_default',
mysql_conn_id='mysql_default',
dag=dag
)
# Regular insert transfer
vertica_to_mysql_insert = VerticaToMySqlOperator(
task_id='transfer_vertica_insert',
sql='SELECT * FROM daily_metrics WHERE date = %s',
mysql_table='analytics.daily_metrics',
bulk_load=False,
dag=dag
)from airflow.providers.mysql.transfers.presto_to_mysql import PrestoToMySqlOperator
# Transfer aggregated data from Presto
presto_to_mysql = PrestoToMySqlOperator(
task_id='load_presto_aggregates',
sql='''
SELECT
date_trunc('day', event_time) as event_date,
event_type,
count(*) as event_count
FROM events
WHERE event_time >= date('{{ ds }}')
GROUP BY 1, 2
''',
mysql_table='analytics.event_summary',
mysql_preoperator='DELETE FROM analytics.event_summary WHERE event_date = "{{ ds }}"',
presto_conn_id='presto_default',
mysql_conn_id='mysql_default',
dag=dag
)from airflow.providers.mysql.transfers.trino_to_mysql import TrinoToMySqlOperator
# Transfer processed data from Trino
trino_to_mysql = TrinoToMySqlOperator(
task_id='load_trino_results',
sql='''
SELECT
customer_id,
product_category,
sum(purchase_amount) as total_spent
FROM purchases
WHERE purchase_date = date('{{ ds }}')
GROUP BY customer_id, product_category
''',
mysql_table='customer_analytics.daily_spending',
mysql_preoperator='''
CREATE TABLE IF NOT EXISTS customer_analytics.daily_spending (
customer_id INT,
product_category VARCHAR(100),
total_spent DECIMAL(10,2),
load_date DATE DEFAULT CURDATE()
)
''',
trino_conn_id='trino_default',
mysql_conn_id='mysql_default',
dag=dag
)All transfer operators support Airflow templating for dynamic values:
# Template variables available in sql, mysql_table, and preoperator fields
sql_with_templates = '''
SELECT * FROM events
WHERE event_date = '{{ ds }}' # Current DAG run date
AND event_time >= '{{ ts }}' # Current DAG run timestamp
AND user_id IN {{ params.user_ids }} # Custom parameters
'''
mysql_table_template = 'staging.events_{{ ds_nodash }}' # Table with date suffix# Configure retry behavior for transfer operations
transfer_operator = S3ToMySqlOperator(
task_id='s3_transfer',
s3_source_key='data/file.csv',
mysql_table='staging.data',
retries=3,
retry_delay=timedelta(minutes=5),
dag=dag
)from airflow.operators.python import PythonOperator
def validate_transfer_results():
hook = MySqlHook(mysql_conn_id='mysql_default')
count = hook.get_first('SELECT COUNT(*) FROM staging.users')[0]
if count == 0:
raise ValueError("No data transferred")
return count
# Add validation after transfer
validate_task = PythonOperator(
task_id='validate_transfer',
python_callable=validate_transfer_results,
dag=dag
)
s3_to_mysql >> validate_task# Base operator context for all transfer operations
Context = Dict[str, Any]
# Transfer operation configuration
TransferConfig = {
"source_conn_id": str, # Source system connection ID
"mysql_conn_id": str, # MySQL connection ID (default: "mysql_default")
"mysql_table": str, # Target table (supports database.table notation)
"mysql_preoperator": str, # SQL to run before transfer (optional)
"mysql_postoperator": str, # SQL to run after transfer (optional)
}
# S3 specific configuration
S3TransferConfig = {
"s3_source_key": str, # S3 object key (templated)
"aws_conn_id": str, # AWS connection (default: "aws_default")
"mysql_duplicate_key_handling": str, # "IGNORE" or "REPLACE"
"mysql_extra_options": str, # Additional LOAD DATA options
"mysql_local_infile": bool # Enable local_infile feature
}
# Bulk load configuration for Vertica transfers
BulkLoadConfig = {
"bulk_load": bool, # Enable bulk loading via temporary files
"tmp_file_path": str, # Temporary file location for bulk operations
}Install with Tessl CLI
npx tessl i tessl/pypi-apache-airflow-providers-mysql