or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

data-transfers.mdhooks-connections.mdindex.mdmacros-utilities.mdpartition-monitoring.mdquery-execution.md

data-transfers.mddocs/

0

# Data Transfer Operations

1

2

Transfer data between Hive and external systems including MySQL, S3, Samba, Vertica, and Microsoft SQL Server. The provider offers bidirectional data movement with transformation and format conversion capabilities for comprehensive ETL operations.

3

4

## Capabilities

5

6

### MySQL to Hive Transfer

7

8

Transfer data from MySQL tables to Hive with automatic schema mapping and partition support.

9

10

```python { .api }

11

class MySqlToHiveOperator:

12

def __init__(

13

self,

14

*,

15

sql: str,

16

hive_table: str,

17

create: bool = True,

18

recreate: bool = False,

19

partition: dict | None = None,

20

delimiter: str = chr(1),

21

quoting: Literal[0, 1, 2, 3] = csv.QUOTE_MINIMAL,

22

quotechar: str = '"',

23

escapechar: str | None = None,

24

mysql_conn_id: str = "mysql_default",

25

hive_cli_conn_id: str = "hive_cli_default",

26

hive_auth: str | None = None,

27

tblproperties: dict | None = None,

28

**kwargs

29

): ...

30

31

@classmethod

32

def type_map(cls, mysql_type: int) -> str: ...

33

34

def execute(self, context: 'Context') -> None: ...

35

```

36

37

**Usage Example:**

38

39

```python

40

from airflow.providers.apache.hive.transfers.mysql_to_hive import MySqlToHiveOperator

41

42

# Basic MySQL to Hive transfer

43

mysql_to_hive = MySqlToHiveOperator(

44

task_id='transfer_customer_data',

45

sql='''

46

SELECT customer_id, name, email, registration_date, status

47

FROM customers

48

WHERE DATE(created_at) = %s

49

''',

50

sql_params=['{{ ds }}'],

51

hive_table='warehouse.customers',

52

partition={'ds': '{{ ds }}'},

53

mysql_conn_id='mysql_prod',

54

hive_cli_conn_id='hive_warehouse',

55

create=True,

56

dag=dag

57

)

58

59

# Complex transfer with table properties

60

advanced_mysql_transfer = MySqlToHiveOperator(

61

task_id='advanced_mysql_transfer',

62

sql='''

63

SELECT o.order_id, o.customer_id, o.total_amount, o.order_date,

64

GROUP_CONCAT(oi.product_id) as product_ids,

65

GROUP_CONCAT(oi.quantity) as quantities

66

FROM orders o

67

JOIN order_items oi ON o.order_id = oi.order_id

68

WHERE DATE(o.order_date) = %s

69

GROUP BY o.order_id, o.customer_id, o.total_amount, o.order_date

70

''',

71

sql_params=['{{ ds }}'],

72

table='analytics.order_summary',

73

partition={'processing_date': '{{ ds }}'},

74

delimiter='\t',

75

tblproperties={

76

'hive.exec.compress.output': 'true',

77

'mapred.output.compression.codec': 'org.apache.hadoop.io.compress.GzipCodec'

78

},

79

dag=dag

80

)

81

```

82

83

### S3 to Hive Transfer

84

85

Transfer data from Amazon S3 to Hive tables with support for various file formats and compression.

86

87

```python { .api }

88

class S3ToHiveOperator:

89

def __init__(

90

self,

91

*,

92

s3_key: str,

93

field_dict: dict,

94

hive_table: str,

95

delimiter: str = ",",

96

create: bool = True,

97

recreate: bool = False,

98

partition: dict | None = None,

99

headers: bool = False,

100

check_headers: bool = False,

101

wildcard_match: bool = False,

102

aws_conn_id: str | None = "aws_default",

103

verify: bool | str | None = None,

104

hive_cli_conn_id: str = "hive_cli_default",

105

input_compressed: bool = False,

106

tblproperties: dict | None = None,

107

select_expression: str | None = None,

108

hive_auth: str | None = None,

109

**kwargs

110

): ...

111

112

def execute(self, context: 'Context') -> None: ...

113

```

114

115

```python { .api }

116

def uncompress_file(input_file_name: str, file_extension: str, dest_dir: str) -> str: ...

117

```

118

119

**Usage Example:**

120

121

```python

122

from airflow.providers.apache.hive.transfers.s3_to_hive import S3ToHiveOperator

123

124

# Basic S3 to Hive transfer

125

s3_to_hive = S3ToHiveOperator(

126

task_id='load_daily_logs',

127

s3_source_key='logs/{{ ds }}/application.log.gz',

128

table='warehouse.application_logs',

129

partition={'log_date': '{{ ds }}'},

130

aws_conn_id='aws_data_lake',

131

hive_cli_conn_id='hive_warehouse',

132

create=True,

133

dag=dag

134

)

135

136

# CSV transfer with pandas options

137

csv_s3_transfer = S3ToHiveOperator(

138

task_id='transfer_csv_data',

139

s3_source_key='exports/daily_report_{{ ds }}.csv.gz',

140

table='reports.daily_metrics',

141

partition={'report_date': '{{ ds }}'},

142

pd_csv_kwargs={

143

'sep': ',',

144

'header': 0,

145

'dtype': {'amount': 'float64', 'count': 'int64'},

146

'parse_dates': ['timestamp']

147

},

148

delimiter='\t',

149

dag=dag

150

)

151

152

# Multiple file transfer with templating

153

multi_file_transfer = S3ToHiveOperator(

154

task_id='load_regional_data',

155

s3_source_key='regional_data/{{ ds }}/{{ params.region }}_data.json.gz',

156

table='analytics.regional_metrics',

157

partition={'ds': '{{ ds }}', 'region': '{{ params.region }}'},

158

params={'region': 'us'},

159

tblproperties={'serialization.format': '1'},

160

dag=dag

161

)

162

```

163

164

### Hive to MySQL Transfer

165

166

Export data from Hive tables to MySQL with support for incremental loads and data transformations.

167

168

```python { .api }

169

class HiveToMySqlOperator:

170

def __init__(

171

self,

172

*,

173

sql: str,

174

mysql_table: str,

175

hiveserver2_conn_id: str = "hiveserver2_default",

176

mysql_conn_id: str = "mysql_default",

177

mysql_preoperator: str | None = None,

178

mysql_postoperator: str | None = None,

179

bulk_load: bool = False,

180

hive_conf: dict | None = None,

181

**kwargs

182

): ...

183

184

def execute(self, context: 'Context') -> None: ...

185

```

186

187

**Usage Example:**

188

189

```python

190

from airflow.providers.apache.hive.transfers.hive_to_mysql import HiveToMySqlOperator

191

192

# Basic Hive to MySQL export

193

hive_to_mysql = HiveToMySqlOperator(

194

task_id='export_daily_summary',

195

sql='''

196

SELECT region, product_category,

197

SUM(revenue) as total_revenue,

198

COUNT(*) as transaction_count

199

FROM warehouse.daily_sales

200

WHERE ds = '{{ ds }}'

201

GROUP BY region, product_category

202

''',

203

mysql_table='reporting.daily_summary',

204

mysql_preoperator='DELETE FROM reporting.daily_summary WHERE report_date = "{{ ds }}"',

205

mysql_postoperator='UPDATE reporting.daily_summary SET last_updated = NOW() WHERE report_date = "{{ ds }}"',

206

hiveserver2_conn_id='hive_analytics',

207

mysql_conn_id='mysql_reporting',

208

dag=dag

209

)

210

211

# Bulk load for large datasets

212

bulk_export = HiveToMySqlOperator(

213

task_id='bulk_export_customers',

214

sql='''

215

SELECT customer_id, name, email, total_orders, lifetime_value

216

FROM warehouse.customer_metrics

217

WHERE ds = '{{ ds }}'

218

''',

219

mysql_table='crm.customer_snapshot',

220

bulk_load=True,

221

mysql_preoperator='TRUNCATE TABLE crm.customer_snapshot',

222

dag=dag

223

)

224

```

225

226

### Microsoft SQL Server to Hive Transfer

227

228

Transfer data from Microsoft SQL Server to Hive with support for complex data types and bulk operations.

229

230

```python { .api }

231

class MsSqlToHiveOperator:

232

def __init__(

233

self,

234

*,

235

sql: str,

236

hive_table: str,

237

create: bool = True,

238

recreate: bool = False,

239

partition: dict | None = None,

240

delimiter: str = chr(1),

241

mssql_conn_id: str = "mssql_default",

242

hive_cli_conn_id: str = "hive_cli_default",

243

hive_auth: str | None = None,

244

tblproperties: dict | None = None,

245

**kwargs

246

): ...

247

248

@classmethod

249

def type_map(cls, mssql_type: int) -> str: ...

250

251

def execute(self, context: 'Context') -> None: ...

252

```

253

254

**Usage Example:**

255

256

```python

257

from airflow.providers.apache.hive.transfers.mssql_to_hive import MsSqlToHiveOperator

258

259

# SQL Server to Hive transfer

260

mssql_to_hive = MsSqlToHiveOperator(

261

task_id='transfer_sales_data',

262

sql='''

263

SELECT SalesOrderID, CustomerID, OrderDate, TotalDue, Status

264

FROM Sales.SalesOrderHeader

265

WHERE CAST(OrderDate AS DATE) = ?

266

''',

267

sql_params=['{{ ds }}'],

268

table='warehouse.sales_orders',

269

partition={'order_date': '{{ ds }}'},

270

mssql_conn_id='mssql_prod',

271

hive_cli_conn_id='hive_warehouse',

272

dag=dag

273

)

274

```

275

276

### Vertica to Hive Transfer

277

278

Transfer data from Vertica to Hive with optimized bulk loading capabilities.

279

280

```python { .api }

281

class VerticaToHiveOperator:

282

def __init__(

283

self,

284

*,

285

sql: str,

286

hive_table: str,

287

create: bool = True,

288

recreate: bool = False,

289

partition: dict | None = None,

290

delimiter: str = chr(1),

291

vertica_conn_id: str = "vertica_default",

292

hive_cli_conn_id: str = "hive_cli_default",

293

hive_auth: str | None = None,

294

**kwargs

295

): ...

296

297

@classmethod

298

def type_map(cls, vertica_type) -> str: ...

299

300

def execute(self, context: 'Context') -> None: ...

301

```

302

303

**Usage Example:**

304

305

```python

306

from airflow.providers.apache.hive.transfers.vertica_to_hive import VerticaToHiveOperator

307

308

# Vertica to Hive transfer

309

vertica_to_hive = VerticaToHiveOperator(

310

task_id='transfer_analytics_data',

311

sql='''

312

SELECT customer_id, event_timestamp, event_type, properties

313

FROM analytics.user_events

314

WHERE DATE(event_timestamp) = DATE(?)

315

''',

316

sql_params=['{{ ds }}'],

317

hive_table='warehouse.user_events',

318

partition={'event_date': '{{ ds }}'},

319

vertica_conn_id='vertica_analytics',

320

hive_cli_conn_id='hive_warehouse',

321

dag=dag

322

)

323

324

# Large dataset transfer with compression

325

bulk_vertica_transfer = VerticaToHiveOperator(

326

task_id='bulk_transfer_transactions',

327

sql='''

328

SELECT transaction_id, user_id, amount, merchant_id, transaction_timestamp

329

FROM transactions.daily_transactions

330

WHERE transaction_date = ?

331

''',

332

sql_params=['{{ ds }}'],

333

hive_table='warehouse.transactions',

334

partition={'transaction_date': '{{ ds }}'},

335

create=True,

336

tblproperties={

337

'hive.exec.compress.output': 'true',

338

'mapred.output.compression.codec': 'org.apache.hadoop.io.compress.GzipCodec'

339

},

340

dag=dag

341

)

342

```

343

344

### Hive to Samba Transfer

345

346

Export data from Hive to Samba file shares for integration with legacy systems.

347

348

```python { .api }

349

class HiveToSambaOperator:

350

def __init__(

351

self,

352

*,

353

hql: str,

354

destination_filepath: str,

355

samba_conn_id: str = 'samba_default',

356

hiveserver2_conn_id: str = 'hiveserver2_default',

357

**kwargs

358

): ...

359

360

def execute(self, context: 'Context') -> None: ...

361

```

362

363

**Usage Example:**

364

365

```python

366

from airflow.providers.apache.hive.transfers.hive_to_samba import HiveToSambaOperator

367

368

# Export to Samba share

369

hive_to_samba = HiveToSambaOperator(

370

task_id='export_to_legacy_system',

371

hql='''

372

SELECT customer_id, order_date, product_code, quantity, amount

373

FROM warehouse.orders

374

WHERE ds = '{{ ds }}'

375

ORDER BY customer_id, order_date

376

''',

377

destination_filepath='/exports/daily_orders_{{ ds }}.csv',

378

samba_conn_id='legacy_file_share',

379

hiveserver2_conn_id='hive_reporting',

380

dag=dag

381

)

382

```

383

384

## Transfer Configuration

385

386

### File Format Handling

387

388

Transfer operators support various file formats and compression:

389

390

```python

391

# CSV with custom formatting

392

csv_transfer = MySqlToHiveOperator(

393

task_id='csv_transfer',

394

sql='SELECT * FROM source_table',

395

table='target_table',

396

delimiter=',',

397

quoting=1, # csv.QUOTE_ALL

398

quotechar='"',

399

escapechar='\\',

400

dag=dag

401

)

402

403

# Tab-separated with compression

404

tsv_transfer = S3ToHiveOperator(

405

task_id='tsv_transfer',

406

s3_source_key='data.tsv.gz',

407

table='target_table',

408

delimiter='\t',

409

tblproperties={

410

'hive.exec.compress.output': 'true',

411

'mapred.output.compression.codec': 'org.apache.hadoop.io.compress.GzipCodec'

412

},

413

dag=dag

414

)

415

```

416

417

### Partition Management

418

419

All transfer operators support dynamic partitioning:

420

421

```python

422

# Static partitioning

423

static_partition = MySqlToHiveOperator(

424

task_id='static_partition',

425

sql='SELECT * FROM orders WHERE order_date = "{{ ds }}"',

426

table='warehouse.orders',

427

partition={'ds': '{{ ds }}', 'region': 'us'},

428

dag=dag

429

)

430

431

# Dynamic partitioning with recreation

432

dynamic_partition = S3ToHiveOperator(

433

task_id='dynamic_partition',

434

s3_source_key='multi_region_data_{{ ds }}.csv',

435

table='warehouse.regional_data',

436

partition={'processing_date': '{{ ds }}'},

437

recreate=True, # Drop and recreate table

438

dag=dag

439

)

440

```

441

442

### Data Quality and Validation

443

444

Implement data quality checks during transfers:

445

446

```python

447

def validate_transfer_quality(**context):

448

from airflow.providers.apache.hive.hooks.hive import HiveServer2Hook

449

450

hive_hook = HiveServer2Hook('hive_warehouse')

451

452

# Check record count

453

result = hive_hook.get_first(

454

f"SELECT COUNT(*) FROM warehouse.transferred_data WHERE ds = '{context['ds']}'"

455

)

456

457

record_count = result[0] if result else 0

458

if record_count < 1000:

459

raise ValueError(f"Transfer validation failed: only {record_count} records")

460

461

context['task_instance'].xcom_push(key='record_count', value=record_count)

462

463

transfer_task = MySqlToHiveOperator(

464

task_id='transfer_data',

465

sql='SELECT * FROM source_data WHERE date_col = "{{ ds }}"',

466

table='warehouse.transferred_data',

467

partition={'ds': '{{ ds }}'},

468

dag=dag

469

)

470

471

validation_task = PythonOperator(

472

task_id='validate_transfer',

473

python_callable=validate_transfer_quality,

474

dag=dag

475

)

476

477

transfer_task >> validation_task

478

```

479

480

## Advanced Transfer Patterns

481

482

### Incremental Loading

483

484

Implement incremental data loading strategies:

485

486

```python

487

# Incremental load with watermark

488

incremental_load = MySqlToHiveOperator(

489

task_id='incremental_customer_load',

490

sql='''

491

SELECT customer_id, name, email, updated_at

492

FROM customers

493

WHERE updated_at >= (

494

SELECT COALESCE(MAX(updated_at), '1900-01-01')

495

FROM warehouse.customers_staging

496

)

497

AND DATE(updated_at) <= %s

498

''',

499

sql_params=['{{ ds }}'],

500

table='warehouse.customers_staging',

501

partition={'load_date': '{{ ds }}'},

502

dag=dag

503

)

504

505

# Merge staging to production

506

merge_task = HiveOperator(

507

task_id='merge_customer_data',

508

hql='''

509

INSERT OVERWRITE TABLE warehouse.customers

510

SELECT DISTINCT

511

COALESCE(prod.customer_id, stage.customer_id) as customer_id,

512

COALESCE(stage.name, prod.name) as name,

513

COALESCE(stage.email, prod.email) as email,

514

GREATEST(COALESCE(stage.updated_at, '1900-01-01'),

515

COALESCE(prod.updated_at, '1900-01-01')) as updated_at

516

FROM warehouse.customers prod

517

FULL OUTER JOIN (

518

SELECT * FROM warehouse.customers_staging

519

WHERE load_date = '{{ ds }}'

520

) stage ON prod.customer_id = stage.customer_id;

521

''',

522

dag=dag

523

)

524

525

incremental_load >> merge_task

526

```

527

528

### Multi-Source Data Integration

529

530

Combine data from multiple sources:

531

532

```python

533

# Transfer from multiple databases

534

mysql_transfer = MySqlToHiveOperator(

535

task_id='mysql_orders',

536

sql='SELECT * FROM orders WHERE date_col = "{{ ds }}"',

537

table='staging.mysql_orders',

538

partition={'ds': '{{ ds }}', 'source': 'mysql'},

539

dag=dag

540

)

541

542

mssql_transfer = MsSqlToHiveOperator(

543

task_id='mssql_orders',

544

sql='SELECT * FROM Orders WHERE DateCol = ?',

545

sql_params=['{{ ds }}'],

546

table='staging.mssql_orders',

547

partition={'ds': '{{ ds }}', 'source': 'mssql'},

548

dag=dag

549

)

550

551

# Combine in Hive

552

combine_sources = HiveOperator(

553

task_id='combine_order_sources',

554

hql='''

555

INSERT OVERWRITE TABLE warehouse.unified_orders

556

PARTITION (ds='{{ ds }}')

557

SELECT order_id, customer_id, amount, 'mysql' as source_system

558

FROM staging.mysql_orders WHERE ds='{{ ds }}'

559

UNION ALL

560

SELECT order_id, customer_id, amount, 'mssql' as source_system

561

FROM staging.mssql_orders WHERE ds='{{ ds }}';

562

''',

563

dag=dag

564

)

565

566

[mysql_transfer, mssql_transfer] >> combine_sources

567

```

568

569

### Error Handling and Recovery

570

571

Implement robust error handling:

572

573

```python

574

def handle_transfer_failure(context):

575

task_instance = context['task_instance']

576

577

# Clean up partial data

578

from airflow.providers.apache.hive.hooks.hive import HiveCliHook

579

hook = HiveCliHook('hive_warehouse')

580

581

hook.run_cli(f'''

582

ALTER TABLE warehouse.failed_transfers

583

DROP IF EXISTS PARTITION (ds='{context["ds"]}', attempt='{task_instance.try_number}')

584

''')

585

586

# Send notification

587

send_failure_notification(f"Transfer failed: {task_instance.task_id}")

588

589

resilient_transfer = MySqlToHiveOperator(

590

task_id='resilient_transfer',

591

sql='SELECT * FROM large_table WHERE date_col = "{{ ds }}"',

592

table='warehouse.large_table',

593

partition={'ds': '{{ ds }}'},

594

retries=3,

595

retry_delay=timedelta(minutes=10),

596

on_failure_callback=handle_transfer_failure,

597

dag=dag

598

)

599

```