or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

asset-uri-handling.mddata-transfer-operations.mddatabase-operations.mdindex.md

data-transfer-operations.mddocs/

0

# Data Transfer Operations

1

2

Transfer operators for moving data from various source systems into MySQL tables. These operators provide high-level task definitions for Airflow DAGs, supporting bulk loading, transformation options, and integration with multiple data sources.

3

4

## Capabilities

5

6

### S3 to MySQL Transfer

7

8

Transfer data from Amazon S3 files directly into MySQL tables using bulk loading operations.

9

10

```python { .api }

11

class S3ToMySqlOperator(BaseOperator):

12

"""

13

Load a file from S3 into a MySQL table.

14

15

Template fields: s3_source_key, mysql_table

16

"""

17

18

def __init__(

19

self,

20

s3_source_key: str,

21

mysql_table: str,

22

mysql_duplicate_key_handling: str = "IGNORE",

23

mysql_extra_options: str = None,

24

aws_conn_id: str = "aws_default",

25

mysql_conn_id: str = "mysql_default",

26

mysql_local_infile: bool = False,

27

**kwargs

28

):

29

"""

30

Initialize S3 to MySQL transfer operator.

31

32

Parameters:

33

- s3_source_key: S3 key path to source file (templated)

34

- mysql_table: Target MySQL table name (templated)

35

- mysql_duplicate_key_handling: Handle duplicates ("IGNORE" or "REPLACE")

36

- mysql_extra_options: Additional MySQL LOAD DATA options

37

- aws_conn_id: S3 connection ID for credentials

38

- mysql_conn_id: MySQL connection ID

39

- mysql_local_infile: Enable local_infile option on MySQLHook

40

"""

41

42

def execute(self, context: Context) -> None:

43

"""

44

Execute S3 to MySQL data transfer.

45

46

Downloads file from S3 and loads into MySQL using bulk operations.

47

"""

48

```

49

50

### Vertica to MySQL Transfer

51

52

Transfer data from Vertica databases to MySQL with support for both bulk loading and regular insert operations.

53

54

```python { .api }

55

class VerticaToMySqlOperator(BaseOperator):

56

"""

57

Move data from Vertica to MySQL.

58

59

Template fields: sql, mysql_table, mysql_preoperator, mysql_postoperator

60

"""

61

62

def __init__(

63

self,

64

sql: str,

65

mysql_table: str,

66

vertica_conn_id: str = "vertica_default",

67

mysql_conn_id: str = "mysql_default",

68

mysql_preoperator: str = None,

69

mysql_postoperator: str = None,

70

bulk_load: bool = False,

71

**kwargs

72

):

73

"""

74

Initialize Vertica to MySQL transfer operator.

75

76

Parameters:

77

- sql: SQL query to execute against Vertica database (templated)

78

- mysql_table: Target MySQL table, supports dot notation (templated)

79

- vertica_conn_id: Source Vertica connection ID

80

- mysql_conn_id: Target MySQL connection ID

81

- mysql_preoperator: SQL statement to run before import (templated)

82

- mysql_postoperator: SQL statement to run after import (templated)

83

- bulk_load: Use LOAD DATA LOCAL INFILE for bulk operations

84

"""

85

86

def execute(self, context: Context):

87

"""

88

Execute Vertica to MySQL data transfer.

89

90

Supports both bulk load (via temporary files) and regular insert operations.

91

"""

92

```

93

94

### Presto to MySQL Transfer

95

96

Transfer data from Presto queries to MySQL tables using in-memory operations for small to medium datasets.

97

98

```python { .api }

99

class PrestoToMySqlOperator(BaseOperator):

100

"""

101

Move data from Presto to MySQL.

102

103

Note: Data is loaded into memory, suitable for small amounts of data.

104

Template fields: sql, mysql_table, mysql_preoperator

105

"""

106

107

def __init__(

108

self,

109

sql: str,

110

mysql_table: str,

111

presto_conn_id: str = "presto_default",

112

mysql_conn_id: str = "mysql_default",

113

mysql_preoperator: str = None,

114

**kwargs

115

):

116

"""

117

Initialize Presto to MySQL transfer operator.

118

119

Parameters:

120

- sql: SQL query to execute against Presto (templated)

121

- mysql_table: Target MySQL table, supports dot notation (templated)

122

- presto_conn_id: Source Presto connection ID

123

- mysql_conn_id: Target MySQL connection ID

124

- mysql_preoperator: SQL statement to run before import (templated)

125

"""

126

127

def execute(self, context: Context) -> None:

128

"""

129

Execute Presto to MySQL data transfer.

130

131

Loads query results into memory before inserting into MySQL.

132

"""

133

```

134

135

### Trino to MySQL Transfer

136

137

Transfer data from Trino queries to MySQL tables using in-memory operations for small to medium datasets.

138

139

```python { .api }

140

class TrinoToMySqlOperator(BaseOperator):

141

"""

142

Move data from Trino to MySQL.

143

144

Note: Data is loaded into memory, suitable for small amounts of data.

145

Template fields: sql, mysql_table, mysql_preoperator

146

"""

147

148

def __init__(

149

self,

150

sql: str,

151

mysql_table: str,

152

trino_conn_id: str = "trino_default",

153

mysql_conn_id: str = "mysql_default",

154

mysql_preoperator: str = None,

155

**kwargs

156

):

157

"""

158

Initialize Trino to MySQL transfer operator.

159

160

Parameters:

161

- sql: SQL query to execute against Trino (templated)

162

- mysql_table: Target MySQL table, supports dot notation (templated)

163

- trino_conn_id: Source Trino connection ID

164

- mysql_conn_id: Target MySQL connection ID

165

- mysql_preoperator: SQL statement to run before import (templated)

166

"""

167

168

def execute(self, context: Context) -> None:

169

"""

170

Execute Trino to MySQL data transfer.

171

172

Loads query results into memory before inserting into MySQL.

173

"""

174

```

175

176

## Usage Examples

177

178

### S3 to MySQL Transfer

179

180

```python

181

from airflow import DAG

182

from airflow.providers.mysql.transfers.s3_to_mysql import S3ToMySqlOperator

183

from datetime import datetime

184

185

dag = DAG('s3_mysql_transfer', start_date=datetime(2024, 1, 1))

186

187

# Basic S3 to MySQL transfer

188

s3_to_mysql = S3ToMySqlOperator(

189

task_id='load_users_from_s3',

190

s3_source_key='data/users/{{ ds }}/users.csv',

191

mysql_table='staging.users',

192

mysql_duplicate_key_handling='REPLACE',

193

mysql_extra_options='FIELDS TERMINATED BY "," ENCLOSED BY "\""',

194

aws_conn_id='aws_default',

195

mysql_conn_id='mysql_default',

196

mysql_local_infile=True,

197

dag=dag

198

)

199

```

200

201

### Vertica to MySQL Transfer

202

203

```python

204

from airflow.providers.mysql.transfers.vertica_to_mysql import VerticaToMySqlOperator

205

206

# Bulk load transfer from Vertica

207

vertica_to_mysql_bulk = VerticaToMySqlOperator(

208

task_id='transfer_vertica_bulk',

209

sql='''

210

SELECT user_id, username, email, created_date

211

FROM users

212

WHERE created_date >= '{{ ds }}'

213

''',

214

mysql_table='staging.users',

215

mysql_preoperator='TRUNCATE TABLE staging.users',

216

mysql_postoperator='CALL update_user_stats()',

217

bulk_load=True,

218

vertica_conn_id='vertica_default',

219

mysql_conn_id='mysql_default',

220

dag=dag

221

)

222

223

# Regular insert transfer

224

vertica_to_mysql_insert = VerticaToMySqlOperator(

225

task_id='transfer_vertica_insert',

226

sql='SELECT * FROM daily_metrics WHERE date = %s',

227

mysql_table='analytics.daily_metrics',

228

bulk_load=False,

229

dag=dag

230

)

231

```

232

233

### Presto to MySQL Transfer

234

235

```python

236

from airflow.providers.mysql.transfers.presto_to_mysql import PrestoToMySqlOperator

237

238

# Transfer aggregated data from Presto

239

presto_to_mysql = PrestoToMySqlOperator(

240

task_id='load_presto_aggregates',

241

sql='''

242

SELECT

243

date_trunc('day', event_time) as event_date,

244

event_type,

245

count(*) as event_count

246

FROM events

247

WHERE event_time >= date('{{ ds }}')

248

GROUP BY 1, 2

249

''',

250

mysql_table='analytics.event_summary',

251

mysql_preoperator='DELETE FROM analytics.event_summary WHERE event_date = "{{ ds }}"',

252

presto_conn_id='presto_default',

253

mysql_conn_id='mysql_default',

254

dag=dag

255

)

256

```

257

258

### Trino to MySQL Transfer

259

260

```python

261

from airflow.providers.mysql.transfers.trino_to_mysql import TrinoToMySqlOperator

262

263

# Transfer processed data from Trino

264

trino_to_mysql = TrinoToMySqlOperator(

265

task_id='load_trino_results',

266

sql='''

267

SELECT

268

customer_id,

269

product_category,

270

sum(purchase_amount) as total_spent

271

FROM purchases

272

WHERE purchase_date = date('{{ ds }}')

273

GROUP BY customer_id, product_category

274

''',

275

mysql_table='customer_analytics.daily_spending',

276

mysql_preoperator='''

277

CREATE TABLE IF NOT EXISTS customer_analytics.daily_spending (

278

customer_id INT,

279

product_category VARCHAR(100),

280

total_spent DECIMAL(10,2),

281

load_date DATE DEFAULT CURDATE()

282

)

283

''',

284

trino_conn_id='trino_default',

285

mysql_conn_id='mysql_default',

286

dag=dag

287

)

288

```

289

290

## Transfer Operation Patterns

291

292

### Template Variables

293

294

All transfer operators support Airflow templating for dynamic values:

295

296

```python

297

# Template variables available in sql, mysql_table, and preoperator fields

298

sql_with_templates = '''

299

SELECT * FROM events

300

WHERE event_date = '{{ ds }}' # Current DAG run date

301

AND event_time >= '{{ ts }}' # Current DAG run timestamp

302

AND user_id IN {{ params.user_ids }} # Custom parameters

303

'''

304

305

mysql_table_template = 'staging.events_{{ ds_nodash }}' # Table with date suffix

306

```

307

308

### Error Handling and Retries

309

310

```python

311

# Configure retry behavior for transfer operations

312

transfer_operator = S3ToMySqlOperator(

313

task_id='s3_transfer',

314

s3_source_key='data/file.csv',

315

mysql_table='staging.data',

316

retries=3,

317

retry_delay=timedelta(minutes=5),

318

dag=dag

319

)

320

```

321

322

### Data Quality Checks

323

324

```python

325

from airflow.operators.python import PythonOperator

326

327

def validate_transfer_results():

328

hook = MySqlHook(mysql_conn_id='mysql_default')

329

count = hook.get_first('SELECT COUNT(*) FROM staging.users')[0]

330

if count == 0:

331

raise ValueError("No data transferred")

332

return count

333

334

# Add validation after transfer

335

validate_task = PythonOperator(

336

task_id='validate_transfer',

337

python_callable=validate_transfer_results,

338

dag=dag

339

)

340

341

s3_to_mysql >> validate_task

342

```

343

344

## Type Definitions

345

346

```python { .api }

347

# Base operator context for all transfer operations

348

Context = Dict[str, Any]

349

350

# Transfer operation configuration

351

TransferConfig = {

352

"source_conn_id": str, # Source system connection ID

353

"mysql_conn_id": str, # MySQL connection ID (default: "mysql_default")

354

"mysql_table": str, # Target table (supports database.table notation)

355

"mysql_preoperator": str, # SQL to run before transfer (optional)

356

"mysql_postoperator": str, # SQL to run after transfer (optional)

357

}

358

359

# S3 specific configuration

360

S3TransferConfig = {

361

"s3_source_key": str, # S3 object key (templated)

362

"aws_conn_id": str, # AWS connection (default: "aws_default")

363

"mysql_duplicate_key_handling": str, # "IGNORE" or "REPLACE"

364

"mysql_extra_options": str, # Additional LOAD DATA options

365

"mysql_local_infile": bool # Enable local_infile feature

366

}

367

368

# Bulk load configuration for Vertica transfers

369

BulkLoadConfig = {

370

"bulk_load": bool, # Enable bulk loading via temporary files

371

"tmp_file_path": str, # Temporary file location for bulk operations

372

}

373

```