Apache Airflow provider package enabling MySQL database integration with hooks, operators, and transfer functionality for data workflows.
npx @tessl/cli install tessl/pypi-apache-airflow-providers-mysql@6.3.00
# Apache Airflow MySQL Provider
1
2
Provider package for Apache Airflow that enables comprehensive MySQL database integration within data workflows. This package provides hooks for database connections, transfer operators for moving data between MySQL and other systems, and asset URI handling for MySQL and MariaDB schemes.
3
4
## Package Information
5
6
- **Package Name**: apache-airflow-providers-mysql
7
- **Language**: Python
8
- **Installation**: `pip install apache-airflow-providers-mysql`
9
10
## Core Imports
11
12
```python
13
from airflow.providers.mysql.hooks.mysql import MySqlHook
14
```
15
16
For transfer operations:
17
18
```python
19
from airflow.providers.mysql.transfers.s3_to_mysql import S3ToMySqlOperator
20
from airflow.providers.mysql.transfers.vertica_to_mysql import VerticaToMySqlOperator
21
from airflow.providers.mysql.transfers.presto_to_mysql import PrestoToMySqlOperator
22
from airflow.providers.mysql.transfers.trino_to_mysql import TrinoToMySqlOperator
23
```
24
25
For asset handling:
26
27
```python
28
from airflow.providers.mysql.assets.mysql import sanitize_uri
29
```
30
31
## Basic Usage
32
33
```python
34
from airflow import DAG
35
from airflow.providers.mysql.hooks.mysql import MySqlHook
36
from airflow.providers.mysql.transfers.s3_to_mysql import S3ToMySqlOperator
37
from datetime import datetime, timedelta
38
39
# Define default DAG arguments
40
default_args = {
41
'owner': 'data_team',
42
'depends_on_past': False,
43
'start_date': datetime(2024, 1, 1),
44
'email_on_failure': False,
45
'email_on_retry': False,
46
'retries': 1,
47
'retry_delay': timedelta(minutes=5)
48
}
49
50
# Create DAG
51
dag = DAG(
52
'mysql_data_pipeline',
53
default_args=default_args,
54
description='MySQL data processing pipeline',
55
schedule_interval=timedelta(days=1),
56
catchup=False
57
)
58
59
# Use MySqlHook for direct database operations
60
def query_mysql_data():
61
hook = MySqlHook(mysql_conn_id='mysql_default')
62
records = hook.get_records('SELECT * FROM users WHERE created_date >= %s', (datetime.now().date(),))
63
return records
64
65
# Use S3ToMySqlOperator for data transfer
66
s3_to_mysql_task = S3ToMySqlOperator(
67
task_id='load_data_from_s3',
68
s3_source_key='data/users.csv',
69
mysql_table='staging.users',
70
mysql_conn_id='mysql_default',
71
aws_conn_id='aws_default',
72
dag=dag
73
)
74
```
75
76
## Architecture
77
78
The MySQL provider package follows Airflow's provider pattern with three main components:
79
80
- **Hooks**: Low-level interfaces for database connections and operations (MySqlHook)
81
- **Operators**: High-level task definitions for data transfers between systems
82
- **Assets**: URI handlers for MySQL/MariaDB dataset tracking and lineage
83
84
The package supports multiple MySQL client libraries (mysqlclient, mysql-connector-python, aiomysql) with automatic client selection and configuration, providing flexibility for different deployment environments and requirements.
85
86
## Capabilities
87
88
### Database Operations
89
90
Core MySQL database connectivity and operations through MySqlHook, supporting multiple MySQL clients, connection management, bulk operations, and AWS IAM authentication.
91
92
```python { .api }
93
class MySqlHook(DbApiHook):
94
def __init__(self, schema=None, local_infile=False, init_command=None, **kwargs): ...
95
def get_conn(self): ...
96
def bulk_load(self, table: str, tmp_file: str) -> None: ...
97
def bulk_dump(self, table: str, tmp_file: str) -> None: ...
98
def get_uri(self) -> str: ...
99
```
100
101
[Database Operations](./database-operations.md)
102
103
### Data Transfer Operations
104
105
Transfer operators for moving data from various source systems (S3, Vertica, Presto, Trino) into MySQL tables, with support for bulk loading and transformation options.
106
107
```python { .api }
108
class S3ToMySqlOperator(BaseOperator): ...
109
class VerticaToMySqlOperator(BaseOperator): ...
110
class PrestoToMySqlOperator(BaseOperator): ...
111
class TrinoToMySqlOperator(BaseOperator): ...
112
```
113
114
[Data Transfer Operations](./data-transfer-operations.md)
115
116
### Asset URI Handling
117
118
URI sanitization and validation for MySQL and MariaDB assets, ensuring proper format and default port assignment for dataset tracking and lineage.
119
120
```python { .api }
121
def sanitize_uri(uri: SplitResult) -> SplitResult: ...
122
```
123
124
[Asset URI Handling](./asset-uri-handling.md)
125
126
## Connection Configuration
127
128
MySQL connections are configured through Airflow's connection management system with the connection type `mysql`. The provider supports multiple MySQL client libraries and authentication methods including AWS IAM.
129
130
### Connection Parameters
131
132
```python { .api }
133
# Connection configuration options in extra field
134
{
135
"charset": "utf8", # Character set
136
"cursor": "SSCursor", # Cursor type (SSCursor, DictCursor, SSDictCursor)
137
"ssl": {...}, # SSL configuration dictionary
138
"ssl_mode": "REQUIRED", # SSL mode
139
"unix_socket": "/path/to/socket", # Unix socket path
140
"client": "mysqlclient", # MySQL client library
141
"iam": true, # Enable AWS IAM authentication
142
"aws_conn_id": "aws_default" # AWS connection for IAM auth
143
}
144
```
145
146
## Error Handling
147
148
The package includes comprehensive error handling for connection failures, authentication issues, and data transfer problems. Common exceptions include connection timeouts, authentication failures, and data format errors.
149
150
```python { .api }
151
# Common exception types
152
RuntimeError # Missing MySQL client libraries
153
ValueError # Invalid table names or URI formats
154
AirflowOptionalProviderFeatureException # Missing optional dependencies
155
```