or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

asset-uri-handling.mddata-transfer-operations.mddatabase-operations.mdindex.md

database-operations.mddocs/

0

# Database Operations

1

2

Core MySQL database connectivity and operations through MySqlHook. This hook provides comprehensive database interaction capabilities including connection management, query execution, bulk operations, and support for multiple MySQL client libraries.

3

4

## Capabilities

5

6

### MySQL Hook

7

8

The MySqlHook class extends Airflow's DbApiHook to provide MySQL-specific functionality with support for multiple client libraries and authentication methods.

9

10

```python { .api }

11

class MySqlHook(DbApiHook):

12

"""

13

Interact with MySQL databases.

14

15

Attributes:

16

- conn_name_attr: "mysql_conn_id"

17

- default_conn_name: "mysql_default"

18

- conn_type: "mysql"

19

- hook_name: "MySQL"

20

- supports_autocommit: True

21

"""

22

23

def __init__(

24

self,

25

*args,

26

schema: str = None,

27

local_infile: bool = False,

28

init_command: str = None,

29

**kwargs

30

):

31

"""

32

Initialize MySQL hook.

33

34

Parameters:

35

- schema: MySQL database schema to connect to

36

- local_infile: Enable local_infile MySQL feature (default: False)

37

- init_command: Initial command to issue upon connection

38

"""

39

```

40

41

### Connection Management

42

43

Establish and manage connections to MySQL databases with support for multiple client libraries and authentication methods.

44

45

```python { .api }

46

def get_conn(self) -> MySQLConnectionTypes:

47

"""

48

Get connection to a MySQL database.

49

50

Establishes connection by extracting configuration from Airflow connection.

51

Supports mysqlclient (default) and mysql-connector-python libraries.

52

53

Returns:

54

MySQL connection object (MySQLdb or mysql.connector connection)

55

56

Raises:

57

RuntimeError: If required MySQL client library is not installed

58

ValueError: If unknown MySQL client name is provided

59

AirflowOptionalProviderFeatureException: If optional dependency missing

60

"""

61

62

def get_uri(self) -> str:

63

"""

64

Get URI for MySQL connection.

65

66

Generates connection URI based on client library and connection parameters.

67

68

Returns:

69

Connection URI string (mysql:// or mysql+mysqlconnector://)

70

"""

71

```

72

73

### Autocommit Control

74

75

Manage transaction autocommit behavior across different MySQL client libraries.

76

77

```python { .api }

78

def set_autocommit(self, conn: MySQLConnectionTypes, autocommit: bool) -> None:

79

"""

80

Set autocommit mode for MySQL connection.

81

82

Handles differences between mysqlclient (uses method) and

83

mysql-connector-python (uses property) libraries.

84

85

Parameters:

86

- conn: MySQL connection object

87

- autocommit: Enable/disable autocommit

88

"""

89

90

def get_autocommit(self, conn: MySQLConnectionTypes) -> bool:

91

"""

92

Get current autocommit setting for MySQL connection.

93

94

Parameters:

95

- conn: MySQL connection object

96

97

Returns:

98

Current autocommit setting (True/False)

99

"""

100

```

101

102

### Bulk Data Operations

103

104

Efficient data loading and dumping operations for large datasets using MySQL's native bulk operations.

105

106

```python { .api }

107

def bulk_load(self, table: str, tmp_file: str) -> None:

108

"""

109

Load tab-delimited file into database table using LOAD DATA LOCAL INFILE.

110

111

Parameters:

112

- table: Target table name (validated for safety)

113

- tmp_file: Path to tab-delimited file

114

115

Raises:

116

ValueError: If table name contains invalid characters

117

"""

118

119

def bulk_dump(self, table: str, tmp_file: str) -> None:

120

"""

121

Dump database table into tab-delimited file using SELECT INTO OUTFILE.

122

123

Parameters:

124

- table: Source table name (validated for safety)

125

- tmp_file: Output file path

126

127

Raises:

128

ValueError: If table name contains invalid characters

129

"""

130

131

def bulk_load_custom(

132

self,

133

table: str,

134

tmp_file: str,

135

duplicate_key_handling: str = "IGNORE",

136

extra_options: str = ""

137

) -> None:

138

"""

139

Load data with configurable options using LOAD DATA LOCAL INFILE.

140

141

Warning: This function has security implications according to MySQL docs.

142

143

Parameters:

144

- table: Target table name

145

- tmp_file: Path to data file

146

- duplicate_key_handling: "IGNORE" or "REPLACE" for duplicate handling

147

- extra_options: Additional SQL options for LOAD DATA statement

148

"""

149

```

150

151

### AWS IAM Authentication

152

153

Support for AWS IAM database authentication for secure, token-based MySQL connections.

154

155

```python { .api }

156

def get_iam_token(self, conn: Connection) -> tuple[str, int]:

157

"""

158

Retrieve temporary password for AWS IAM authentication to MySQL.

159

160

Uses AWS RDS generate_db_auth_token to create temporary password.

161

162

Parameters:

163

- conn: Airflow connection with IAM configuration

164

165

Returns:

166

Tuple of (temporary_password, port)

167

168

Configuration in connection extra:

169

{"iam": true, "aws_conn_id": "aws_default"}

170

"""

171

```

172

173

### OpenLineage Integration

174

175

Data lineage and metadata support for OpenLineage tracking systems.

176

177

```python { .api }

178

def get_openlineage_database_info(self, connection) -> DatabaseInfo:

179

"""

180

Return MySQL-specific information for OpenLineage data lineage.

181

182

Parameters:

183

- connection: Database connection

184

185

Returns:

186

DatabaseInfo object with MySQL schema and authority information

187

"""

188

189

def get_openlineage_database_dialect(self, _) -> str:

190

"""

191

Return database dialect identifier.

192

193

Returns:

194

"mysql"

195

"""

196

197

def get_openlineage_default_schema(self) -> None:

198

"""

199

Return default schema (MySQL has no schema concept).

200

201

Returns:

202

None

203

"""

204

```

205

206

### Data Serialization

207

208

Handle data type conversion for MySQL database operations.

209

210

```python { .api }

211

@staticmethod

212

def _serialize_cell(cell: object, conn: Connection = None) -> Any:

213

"""

214

Convert argument to database literal.

215

216

MySQLdb handles serialization automatically, so this method

217

returns the cell unchanged.

218

219

Parameters:

220

- cell: Data to serialize

221

- conn: Database connection (unused)

222

223

Returns:

224

Unchanged cell value

225

"""

226

```

227

228

## Usage Examples

229

230

### Basic Database Connection

231

232

```python

233

from airflow.providers.mysql.hooks.mysql import MySqlHook

234

235

# Create hook with default connection

236

hook = MySqlHook(mysql_conn_id='mysql_default')

237

238

# Execute query and fetch results

239

records = hook.get_records('SELECT * FROM users WHERE active = %s', (True,))

240

241

# Execute single query

242

hook.run('UPDATE users SET last_login = NOW() WHERE id = %s', parameters=(user_id,))

243

```

244

245

### Bulk Data Loading

246

247

```python

248

# Load data from CSV file

249

hook = MySqlHook(mysql_conn_id='mysql_default', local_infile=True)

250

251

# Simple bulk load (tab-delimited)

252

hook.bulk_load('staging_table', '/tmp/data.tsv')

253

254

# Custom bulk load with duplicate handling

255

hook.bulk_load_custom(

256

table='users',

257

tmp_file='/tmp/users.csv',

258

duplicate_key_handling='REPLACE',

259

extra_options='FIELDS TERMINATED BY "," ENCLOSED BY "\""'

260

)

261

```

262

263

### AWS IAM Authentication

264

265

```python

266

# Configure connection with IAM authentication

267

# Connection extra: {"iam": true, "aws_conn_id": "aws_default"}

268

269

hook = MySqlHook(mysql_conn_id='mysql_iam_conn')

270

connection = hook.get_conn() # Uses temporary IAM token

271

```

272

273

### Connection URI Generation

274

275

```python

276

hook = MySqlHook(mysql_conn_id='mysql_default')

277

uri = hook.get_uri() # Returns: mysql://user:pass@host:port/database

278

```

279

280

## Type Definitions

281

282

```python { .api }

283

# MySQL connection type union

284

MySQLConnectionTypes = Union[MySQLdbConnection, MySQLConnectionAbstract]

285

286

# Connection extra configuration

287

ConnectionExtra = {

288

"charset": str, # Character encoding (e.g., "utf8")

289

"cursor": str, # Cursor type ("SSCursor", "DictCursor", "SSDictCursor")

290

"ssl": dict, # SSL configuration dictionary

291

"ssl_mode": str, # SSL mode ("REQUIRED", "PREFERRED", etc.)

292

"unix_socket": str, # Unix socket path

293

"client": str, # Client library ("mysqlclient", "mysql-connector-python")

294

"iam": bool, # Enable AWS IAM authentication

295

"aws_conn_id": str # AWS connection ID for IAM

296

}

297

```