0
# Database Operations
1
2
Core MySQL database connectivity and operations through MySqlHook. This hook provides comprehensive database interaction capabilities including connection management, query execution, bulk operations, and support for multiple MySQL client libraries.
3
4
## Capabilities
5
6
### MySQL Hook
7
8
The MySqlHook class extends Airflow's DbApiHook to provide MySQL-specific functionality with support for multiple client libraries and authentication methods.
9
10
```python { .api }
11
class MySqlHook(DbApiHook):
12
"""
13
Interact with MySQL databases.
14
15
Attributes:
16
- conn_name_attr: "mysql_conn_id"
17
- default_conn_name: "mysql_default"
18
- conn_type: "mysql"
19
- hook_name: "MySQL"
20
- supports_autocommit: True
21
"""
22
23
def __init__(
24
self,
25
*args,
26
schema: str = None,
27
local_infile: bool = False,
28
init_command: str = None,
29
**kwargs
30
):
31
"""
32
Initialize MySQL hook.
33
34
Parameters:
35
- schema: MySQL database schema to connect to
36
- local_infile: Enable local_infile MySQL feature (default: False)
37
- init_command: Initial command to issue upon connection
38
"""
39
```
40
41
### Connection Management
42
43
Establish and manage connections to MySQL databases with support for multiple client libraries and authentication methods.
44
45
```python { .api }
46
def get_conn(self) -> MySQLConnectionTypes:
47
"""
48
Get connection to a MySQL database.
49
50
Establishes connection by extracting configuration from Airflow connection.
51
Supports mysqlclient (default) and mysql-connector-python libraries.
52
53
Returns:
54
MySQL connection object (MySQLdb or mysql.connector connection)
55
56
Raises:
57
RuntimeError: If required MySQL client library is not installed
58
ValueError: If unknown MySQL client name is provided
59
AirflowOptionalProviderFeatureException: If optional dependency missing
60
"""
61
62
def get_uri(self) -> str:
63
"""
64
Get URI for MySQL connection.
65
66
Generates connection URI based on client library and connection parameters.
67
68
Returns:
69
Connection URI string (mysql:// or mysql+mysqlconnector://)
70
"""
71
```
72
73
### Autocommit Control
74
75
Manage transaction autocommit behavior across different MySQL client libraries.
76
77
```python { .api }
78
def set_autocommit(self, conn: MySQLConnectionTypes, autocommit: bool) -> None:
79
"""
80
Set autocommit mode for MySQL connection.
81
82
Handles differences between mysqlclient (uses method) and
83
mysql-connector-python (uses property) libraries.
84
85
Parameters:
86
- conn: MySQL connection object
87
- autocommit: Enable/disable autocommit
88
"""
89
90
def get_autocommit(self, conn: MySQLConnectionTypes) -> bool:
91
"""
92
Get current autocommit setting for MySQL connection.
93
94
Parameters:
95
- conn: MySQL connection object
96
97
Returns:
98
Current autocommit setting (True/False)
99
"""
100
```
101
102
### Bulk Data Operations
103
104
Efficient data loading and dumping operations for large datasets using MySQL's native bulk operations.
105
106
```python { .api }
107
def bulk_load(self, table: str, tmp_file: str) -> None:
108
"""
109
Load tab-delimited file into database table using LOAD DATA LOCAL INFILE.
110
111
Parameters:
112
- table: Target table name (validated for safety)
113
- tmp_file: Path to tab-delimited file
114
115
Raises:
116
ValueError: If table name contains invalid characters
117
"""
118
119
def bulk_dump(self, table: str, tmp_file: str) -> None:
120
"""
121
Dump database table into tab-delimited file using SELECT INTO OUTFILE.
122
123
Parameters:
124
- table: Source table name (validated for safety)
125
- tmp_file: Output file path
126
127
Raises:
128
ValueError: If table name contains invalid characters
129
"""
130
131
def bulk_load_custom(
132
self,
133
table: str,
134
tmp_file: str,
135
duplicate_key_handling: str = "IGNORE",
136
extra_options: str = ""
137
) -> None:
138
"""
139
Load data with configurable options using LOAD DATA LOCAL INFILE.
140
141
Warning: This function has security implications according to MySQL docs.
142
143
Parameters:
144
- table: Target table name
145
- tmp_file: Path to data file
146
- duplicate_key_handling: "IGNORE" or "REPLACE" for duplicate handling
147
- extra_options: Additional SQL options for LOAD DATA statement
148
"""
149
```
150
151
### AWS IAM Authentication
152
153
Support for AWS IAM database authentication for secure, token-based MySQL connections.
154
155
```python { .api }
156
def get_iam_token(self, conn: Connection) -> tuple[str, int]:
157
"""
158
Retrieve temporary password for AWS IAM authentication to MySQL.
159
160
Uses AWS RDS generate_db_auth_token to create temporary password.
161
162
Parameters:
163
- conn: Airflow connection with IAM configuration
164
165
Returns:
166
Tuple of (temporary_password, port)
167
168
Configuration in connection extra:
169
{"iam": true, "aws_conn_id": "aws_default"}
170
"""
171
```
172
173
### OpenLineage Integration
174
175
Data lineage and metadata support for OpenLineage tracking systems.
176
177
```python { .api }
178
def get_openlineage_database_info(self, connection) -> DatabaseInfo:
179
"""
180
Return MySQL-specific information for OpenLineage data lineage.
181
182
Parameters:
183
- connection: Database connection
184
185
Returns:
186
DatabaseInfo object with MySQL schema and authority information
187
"""
188
189
def get_openlineage_database_dialect(self, _) -> str:
190
"""
191
Return database dialect identifier.
192
193
Returns:
194
"mysql"
195
"""
196
197
def get_openlineage_default_schema(self) -> None:
198
"""
199
Return default schema (MySQL has no schema concept).
200
201
Returns:
202
None
203
"""
204
```
205
206
### Data Serialization
207
208
Handle data type conversion for MySQL database operations.
209
210
```python { .api }
211
@staticmethod
212
def _serialize_cell(cell: object, conn: Connection = None) -> Any:
213
"""
214
Convert argument to database literal.
215
216
MySQLdb handles serialization automatically, so this method
217
returns the cell unchanged.
218
219
Parameters:
220
- cell: Data to serialize
221
- conn: Database connection (unused)
222
223
Returns:
224
Unchanged cell value
225
"""
226
```
227
228
## Usage Examples
229
230
### Basic Database Connection
231
232
```python
233
from airflow.providers.mysql.hooks.mysql import MySqlHook
234
235
# Create hook with default connection
236
hook = MySqlHook(mysql_conn_id='mysql_default')
237
238
# Execute query and fetch results
239
records = hook.get_records('SELECT * FROM users WHERE active = %s', (True,))
240
241
# Execute single query
242
hook.run('UPDATE users SET last_login = NOW() WHERE id = %s', parameters=(user_id,))
243
```
244
245
### Bulk Data Loading
246
247
```python
248
# Load data from CSV file
249
hook = MySqlHook(mysql_conn_id='mysql_default', local_infile=True)
250
251
# Simple bulk load (tab-delimited)
252
hook.bulk_load('staging_table', '/tmp/data.tsv')
253
254
# Custom bulk load with duplicate handling
255
hook.bulk_load_custom(
256
table='users',
257
tmp_file='/tmp/users.csv',
258
duplicate_key_handling='REPLACE',
259
extra_options='FIELDS TERMINATED BY "," ENCLOSED BY "\""'
260
)
261
```
262
263
### AWS IAM Authentication
264
265
```python
266
# Configure connection with IAM authentication
267
# Connection extra: {"iam": true, "aws_conn_id": "aws_default"}
268
269
hook = MySqlHook(mysql_conn_id='mysql_iam_conn')
270
connection = hook.get_conn() # Uses temporary IAM token
271
```
272
273
### Connection URI Generation
274
275
```python
276
hook = MySqlHook(mysql_conn_id='mysql_default')
277
uri = hook.get_uri() # Returns: mysql://user:pass@host:port/database
278
```
279
280
## Type Definitions
281
282
```python { .api }
283
# MySQL connection type union
284
MySQLConnectionTypes = Union[MySQLdbConnection, MySQLConnectionAbstract]
285
286
# Connection extra configuration
287
ConnectionExtra = {
288
"charset": str, # Character encoding (e.g., "utf8")
289
"cursor": str, # Cursor type ("SSCursor", "DictCursor", "SSDictCursor")
290
"ssl": dict, # SSL configuration dictionary
291
"ssl_mode": str, # SSL mode ("REQUIRED", "PREFERRED", etc.)
292
"unix_socket": str, # Unix socket path
293
"client": str, # Client library ("mysqlclient", "mysql-connector-python")
294
"iam": bool, # Enable AWS IAM authentication
295
"aws_conn_id": str # AWS connection ID for IAM
296
}
297
```