Apache Airflow provider package enabling MySQL database integration with hooks, operators, and transfer functionality for data workflows.
npx @tessl/cli install tessl/pypi-apache-airflow-providers-mysql@6.3.0Provider package for Apache Airflow that enables comprehensive MySQL database integration within data workflows. This package provides hooks for database connections, transfer operators for moving data between MySQL and other systems, and asset URI handling for MySQL and MariaDB schemes.
pip install apache-airflow-providers-mysqlfrom airflow.providers.mysql.hooks.mysql import MySqlHookFor transfer operations:
from airflow.providers.mysql.transfers.s3_to_mysql import S3ToMySqlOperator
from airflow.providers.mysql.transfers.vertica_to_mysql import VerticaToMySqlOperator
from airflow.providers.mysql.transfers.presto_to_mysql import PrestoToMySqlOperator
from airflow.providers.mysql.transfers.trino_to_mysql import TrinoToMySqlOperatorFor asset handling:
from airflow.providers.mysql.assets.mysql import sanitize_urifrom airflow import DAG
from airflow.providers.mysql.hooks.mysql import MySqlHook
from airflow.providers.mysql.transfers.s3_to_mysql import S3ToMySqlOperator
from datetime import datetime, timedelta
# Define default DAG arguments
default_args = {
'owner': 'data_team',
'depends_on_past': False,
'start_date': datetime(2024, 1, 1),
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5)
}
# Create DAG
dag = DAG(
'mysql_data_pipeline',
default_args=default_args,
description='MySQL data processing pipeline',
schedule_interval=timedelta(days=1),
catchup=False
)
# Use MySqlHook for direct database operations
def query_mysql_data():
hook = MySqlHook(mysql_conn_id='mysql_default')
records = hook.get_records('SELECT * FROM users WHERE created_date >= %s', (datetime.now().date(),))
return records
# Use S3ToMySqlOperator for data transfer
s3_to_mysql_task = S3ToMySqlOperator(
task_id='load_data_from_s3',
s3_source_key='data/users.csv',
mysql_table='staging.users',
mysql_conn_id='mysql_default',
aws_conn_id='aws_default',
dag=dag
)The MySQL provider package follows Airflow's provider pattern with three main components:
The package supports multiple MySQL client libraries (mysqlclient, mysql-connector-python, aiomysql) with automatic client selection and configuration, providing flexibility for different deployment environments and requirements.
Core MySQL database connectivity and operations through MySqlHook, supporting multiple MySQL clients, connection management, bulk operations, and AWS IAM authentication.
class MySqlHook(DbApiHook):
def __init__(self, schema=None, local_infile=False, init_command=None, **kwargs): ...
def get_conn(self): ...
def bulk_load(self, table: str, tmp_file: str) -> None: ...
def bulk_dump(self, table: str, tmp_file: str) -> None: ...
def get_uri(self) -> str: ...Transfer operators for moving data from various source systems (S3, Vertica, Presto, Trino) into MySQL tables, with support for bulk loading and transformation options.
class S3ToMySqlOperator(BaseOperator): ...
class VerticaToMySqlOperator(BaseOperator): ...
class PrestoToMySqlOperator(BaseOperator): ...
class TrinoToMySqlOperator(BaseOperator): ...URI sanitization and validation for MySQL and MariaDB assets, ensuring proper format and default port assignment for dataset tracking and lineage.
def sanitize_uri(uri: SplitResult) -> SplitResult: ...MySQL connections are configured through Airflow's connection management system with the connection type mysql. The provider supports multiple MySQL client libraries and authentication methods including AWS IAM.
# Connection configuration options in extra field
{
"charset": "utf8", # Character set
"cursor": "SSCursor", # Cursor type (SSCursor, DictCursor, SSDictCursor)
"ssl": {...}, # SSL configuration dictionary
"ssl_mode": "REQUIRED", # SSL mode
"unix_socket": "/path/to/socket", # Unix socket path
"client": "mysqlclient", # MySQL client library
"iam": true, # Enable AWS IAM authentication
"aws_conn_id": "aws_default" # AWS connection for IAM auth
}The package includes comprehensive error handling for connection failures, authentication issues, and data transfer problems. Common exceptions include connection timeouts, authentication failures, and data format errors.
# Common exception types
RuntimeError # Missing MySQL client libraries
ValueError # Invalid table names or URI formats
AirflowOptionalProviderFeatureException # Missing optional dependencies