or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

athena-analytics.mdauthentication.mdbatch-processing.mddata-transfers.mddms-migration.mddynamodb-nosql.mdecs-containers.mdeks-kubernetes.mdemr-clusters.mdglue-processing.mdindex.mdlambda-functions.mdmessaging-sns-sqs.mdrds-databases.mdredshift-warehouse.mds3-storage.mdsagemaker-ml.md

data-transfers.mddocs/

0

# Data Transfer Operations

1

2

Comprehensive data movement capabilities between AWS services and external systems. Provides operators for transferring data between S3, Redshift, databases, FTP/SFTP servers, and other data sources with support for various formats and transformation options.

3

4

## Capabilities

5

6

### S3 to Redshift Transfer

7

8

Transfer data from S3 to Redshift tables with support for COPY command options and data transformation.

9

10

```python { .api }

11

class S3ToRedshiftOperator(BaseOperator):

12

def __init__(self, schema: str, table: str, s3_bucket: str, s3_key: str, redshift_conn_id: str = 'redshift_default', aws_conn_id: str = 'aws_default', verify: bool = None, wildcard_match: bool = False, copy_options: list = None, autocommit: bool = True, parameters: dict = None, **kwargs):

13

"""

14

Transfer data from S3 to Redshift table.

15

16

Parameters:

17

- schema: Redshift schema name

18

- table: Redshift table name

19

- s3_bucket: S3 bucket containing source data

20

- s3_key: S3 key path for source data

21

- redshift_conn_id: Redshift connection ID

22

- aws_conn_id: AWS connection ID for S3 access

23

- verify: SSL certificate verification

24

- wildcard_match: Use wildcard matching for S3 keys

25

- copy_options: Additional COPY command options

26

- autocommit: Auto-commit the transaction

27

- parameters: Additional parameters for the operation

28

"""

29

```

30

31

### Redshift to S3 Transfer

32

33

Export data from Redshift tables to S3 with support for UNLOAD command options and parallel processing.

34

35

```python { .api }

36

class RedshiftToS3Operator(BaseOperator):

37

def __init__(self, s3_bucket: str, s3_key: str, schema: str = None, table: str = None, select_query: str = None, redshift_conn_id: str = 'redshift_default', aws_conn_id: str = 'aws_default', verify: bool = None, unload_options: list = None, autocommit: bool = True, include_header: bool = False, **kwargs):

38

"""

39

Transfer data from Redshift to S3.

40

41

Parameters:

42

- s3_bucket: S3 bucket for destination data

43

- s3_key: S3 key path for destination data

44

- schema: Redshift schema name (if using table)

45

- table: Redshift table name (if using table)

46

- select_query: Custom SELECT query (alternative to schema/table)

47

- redshift_conn_id: Redshift connection ID

48

- aws_conn_id: AWS connection ID for S3 access

49

- verify: SSL certificate verification

50

- unload_options: Additional UNLOAD command options

51

- autocommit: Auto-commit the transaction

52

- include_header: Include column headers in output

53

"""

54

```

55

56

### SQL to S3 Transfer

57

58

Transfer SQL query results from any database to S3 with support for various file formats and partitioning.

59

60

```python { .api }

61

class SqlToS3Operator(BaseOperator):

62

def __init__(self, query: str, s3_bucket: str, s3_key: str, sql_conn_id: str, aws_conn_id: str = 'aws_default', verify: bool = None, replace: bool = True, pd_kwargs: dict = None, index: bool = True, **kwargs):

63

"""

64

Transfer SQL query results to S3.

65

66

Parameters:

67

- query: SQL query to execute

68

- s3_bucket: S3 bucket for destination data

69

- s3_key: S3 key path for destination data

70

- sql_conn_id: SQL database connection ID

71

- aws_conn_id: AWS connection ID for S3 access

72

- verify: SSL certificate verification

73

- replace: Replace existing S3 object

74

- pd_kwargs: Additional pandas parameters for data processing

75

- index: Include DataFrame index in output

76

"""

77

```

78

79

### S3 to SQL Transfer

80

81

Transfer data from S3 to SQL databases with support for data type inference and table creation.

82

83

```python { .api }

84

class S3ToSqlOperator(BaseOperator):

85

def __init__(self, s3_bucket: str, s3_key: str, table: str, sql_conn_id: str, aws_conn_id: str = 'aws_default', verify: bool = None, wildcard_match: bool = False, copy_options: dict = None, **kwargs):

86

"""

87

Transfer data from S3 to SQL database table.

88

89

Parameters:

90

- s3_bucket: S3 bucket containing source data

91

- s3_key: S3 key path for source data

92

- table: Destination table name

93

- sql_conn_id: SQL database connection ID

94

- aws_conn_id: AWS connection ID for S3 access

95

- verify: SSL certificate verification

96

- wildcard_match: Use wildcard matching for S3 keys

97

- copy_options: Additional copy options

98

"""

99

```

100

101

### DynamoDB to S3 Transfer

102

103

Export DynamoDB table data to S3 with support for parallel scans and data filtering.

104

105

```python { .api }

106

class DynamoDbToS3Operator(BaseOperator):

107

def __init__(self, dynamodb_table_name: str, s3_bucket_name: str, s3_key: str, aws_conn_id: str = 'aws_default', dynamodb_scan_kwargs: dict = None, s3_key_type: str = 'table_name', **kwargs):

108

"""

109

Export DynamoDB table to S3.

110

111

Parameters:

112

- dynamodb_table_name: DynamoDB table name

113

- s3_bucket_name: S3 bucket for destination data

114

- s3_key: S3 key path for destination data

115

- aws_conn_id: AWS connection ID

116

- dynamodb_scan_kwargs: Additional DynamoDB scan parameters

117

- s3_key_type: S3 key generation strategy

118

"""

119

```

120

121

### File System Transfers

122

123

Transfer operations between S3 and local file systems or remote file servers.

124

125

```python { .api }

126

class LocalFilesystemToS3Operator(BaseOperator):

127

def __init__(self, filename: str, dest_key: str, dest_bucket: str = None, aws_conn_id: str = 'aws_default', verify: bool = None, replace: bool = True, **kwargs):

128

"""

129

Transfer local file to S3.

130

131

Parameters:

132

- filename: Local file path

133

- dest_key: S3 destination key

134

- dest_bucket: S3 destination bucket

135

- aws_conn_id: AWS connection ID

136

- verify: SSL certificate verification

137

- replace: Replace existing S3 object

138

"""

139

140

class S3ToFtpOperator(BaseOperator):

141

def __init__(self, s3_bucket: str, s3_key: str, ftp_path: str, ftp_conn_id: str = 'ftp_default', aws_conn_id: str = 'aws_default', **kwargs):

142

"""

143

Transfer S3 object to FTP server.

144

145

Parameters:

146

- s3_bucket: S3 source bucket

147

- s3_key: S3 source key

148

- ftp_path: FTP destination path

149

- ftp_conn_id: FTP connection ID

150

- aws_conn_id: AWS connection ID

151

"""

152

153

class FtpToS3Operator(BaseOperator):

154

def __init__(self, ftp_path: str, s3_bucket: str, s3_key: str, ftp_conn_id: str = 'ftp_default', aws_conn_id: str = 'aws_default', **kwargs):

155

"""

156

Transfer file from FTP server to S3.

157

158

Parameters:

159

- ftp_path: FTP source path

160

- s3_bucket: S3 destination bucket

161

- s3_key: S3 destination key

162

- ftp_conn_id: FTP connection ID

163

- aws_conn_id: AWS connection ID

164

"""

165

166

class S3ToSftpOperator(BaseOperator):

167

def __init__(self, sftp_path: str, s3_bucket: str, s3_key: str, sftp_conn_id: str = 'sftp_default', aws_conn_id: str = 'aws_default', **kwargs):

168

"""

169

Transfer S3 object to SFTP server.

170

171

Parameters:

172

- sftp_path: SFTP destination path

173

- s3_bucket: S3 source bucket

174

- s3_key: S3 source key

175

- sftp_conn_id: SFTP connection ID

176

- aws_conn_id: AWS connection ID

177

"""

178

179

class SftpToS3Operator(BaseOperator):

180

def __init__(self, s3_bucket: str, s3_key: str, sftp_path: str, sftp_conn_id: str = 'sftp_default', aws_conn_id: str = 'aws_default', **kwargs):

181

"""

182

Transfer file from SFTP server to S3.

183

184

Parameters:

185

- s3_bucket: S3 destination bucket

186

- s3_key: S3 destination key

187

- sftp_path: SFTP source path

188

- sftp_conn_id: SFTP connection ID

189

- aws_conn_id: AWS connection ID

190

"""

191

```

192

193

### Web and API Transfers

194

195

Transfer data from web APIs and HTTP sources to S3.

196

197

```python { .api }

198

class HttpToS3Operator(BaseOperator):

199

def __init__(self, endpoint: str, s3_bucket: str, s3_key: str, http_conn_id: str = 'http_default', aws_conn_id: str = 'aws_default', method: str = 'GET', headers: dict = None, **kwargs):

200

"""

201

Transfer HTTP response data to S3.

202

203

Parameters:

204

- endpoint: HTTP endpoint URL

205

- s3_bucket: S3 destination bucket

206

- s3_key: S3 destination key

207

- http_conn_id: HTTP connection ID

208

- aws_conn_id: AWS connection ID

209

- method: HTTP method (GET, POST, etc.)

210

- headers: HTTP request headers

211

"""

212

213

class GoogleApiToS3Operator(BaseOperator):

214

def __init__(self, google_api_service_name: str, google_api_service_version: str, google_api_endpoint_path: str, s3_bucket: str, s3_key: str, google_conn_id: str = 'google_cloud_default', aws_conn_id: str = 'aws_default', **kwargs):

215

"""

216

Transfer Google API data to S3.

217

218

Parameters:

219

- google_api_service_name: Google API service name

220

- google_api_service_version: Google API service version

221

- google_api_endpoint_path: API endpoint path

222

- s3_bucket: S3 destination bucket

223

- s3_key: S3 destination key

224

- google_conn_id: Google Cloud connection ID

225

- aws_conn_id: AWS connection ID

226

"""

227

```

228

229

### Database Transfers

230

231

Transfer data between S3 and various database systems.

232

233

```python { .api }

234

class MongoToS3Operator(BaseOperator):

235

def __init__(self, mongo_collection: str, s3_bucket: str, s3_key: str, mongo_conn_id: str = 'mongo_default', aws_conn_id: str = 'aws_default', mongo_query: dict = None, **kwargs):

236

"""

237

Transfer MongoDB collection data to S3.

238

239

Parameters:

240

- mongo_collection: MongoDB collection name

241

- s3_bucket: S3 destination bucket

242

- s3_key: S3 destination key

243

- mongo_conn_id: MongoDB connection ID

244

- aws_conn_id: AWS connection ID

245

- mongo_query: MongoDB query filter

246

"""

247

```

248

249

## Usage Examples

250

251

### S3 to Redshift Data Pipeline

252

253

```python

254

from airflow import DAG

255

from airflow.providers.amazon.aws.transfers.s3_to_redshift import S3ToRedshiftOperator

256

from airflow.providers.amazon.aws.sensors.s3 import S3KeySensor

257

258

dag = DAG('s3_to_redshift_pipeline', start_date=datetime(2023, 1, 1))

259

260

# Wait for data file in S3

261

wait_for_data = S3KeySensor(

262

task_id='wait_for_data',

263

bucket_name='data-lake-bucket',

264

bucket_key='raw-data/{{ ds }}/sales_data.csv',

265

timeout=3600,

266

dag=dag

267

)

268

269

# Load data into Redshift

270

load_to_redshift = S3ToRedshiftOperator(

271

task_id='load_to_redshift',

272

schema='analytics',

273

table='daily_sales',

274

s3_bucket='data-lake-bucket',

275

s3_key='raw-data/{{ ds }}/sales_data.csv',

276

redshift_conn_id='redshift_prod',

277

aws_conn_id='aws_default',

278

copy_options=[

279

"CSV",

280

"IGNOREHEADER 1",

281

"DATEFORMAT 'YYYY-MM-DD'",

282

"TRUNCATECOLUMNS"

283

],

284

dag=dag

285

)

286

287

wait_for_data >> load_to_redshift

288

```

289

290

### Database Export to S3

291

292

```python

293

from airflow.providers.amazon.aws.transfers.sql_to_s3 import SqlToS3Operator

294

295

# Export PostgreSQL query results to S3

296

export_sales_data = SqlToS3Operator(

297

task_id='export_sales_data',

298

query="""

299

SELECT

300

date_trunc('day', order_date) as order_day,

301

customer_id,

302

product_id,

303

quantity,

304

price,

305

total_amount

306

FROM orders

307

WHERE order_date >= '{{ ds }}'

308

AND order_date < '{{ next_ds }}'

309

""",

310

s3_bucket='analytics-exports',

311

s3_key='exports/sales/{{ ds }}/daily_sales.parquet',

312

sql_conn_id='postgres_prod',

313

aws_conn_id='aws_default',

314

pd_kwargs={

315

'dtype': {

316

'customer_id': 'int64',

317

'product_id': 'int64',

318

'quantity': 'int32',

319

'price': 'float64'

320

}

321

},

322

dag=dag

323

)

324

```

325

326

### Multi-Source Data Integration

327

328

```python

329

from airflow.providers.amazon.aws.transfers.ftp_to_s3 import FtpToS3Operator

330

from airflow.providers.amazon.aws.transfers.http_to_s3 import HttpToS3Operator

331

from airflow.providers.amazon.aws.transfers.s3_to_redshift import S3ToRedshiftOperator

332

333

# Download files from multiple sources

334

ftp_download = FtpToS3Operator(

335

task_id='download_from_ftp',

336

ftp_path='/data/exports/customer_data.csv',

337

s3_bucket='integration-staging',

338

s3_key='sources/ftp/{{ ds }}/customer_data.csv',

339

ftp_conn_id='partner_ftp',

340

dag=dag

341

)

342

343

api_download = HttpToS3Operator(

344

task_id='download_from_api',

345

endpoint='/api/v1/products?date={{ ds }}',

346

s3_bucket='integration-staging',

347

s3_key='sources/api/{{ ds }}/products.json',

348

http_conn_id='partner_api',

349

headers={'Authorization': 'Bearer {{ var.value.api_token }}'},

350

dag=dag

351

)

352

353

# Load all data into warehouse

354

load_customers = S3ToRedshiftOperator(

355

task_id='load_customers',

356

schema='staging',

357

table='customers',

358

s3_bucket='integration-staging',

359

s3_key='sources/ftp/{{ ds }}/',

360

wildcard_match=True,

361

copy_options=["CSV", "IGNOREHEADER 1"],

362

dag=dag

363

)

364

365

# Parallel downloads, then load

366

[ftp_download, api_download] >> load_customers

367

```

368

369

### Data Export Pipeline

370

371

```python

372

from airflow.providers.amazon.aws.transfers.redshift_to_s3 import RedshiftToS3Operator

373

from airflow.providers.amazon.aws.transfers.s3_to_sftp import S3ToSftpOperator

374

375

# Export aggregated data from Redshift

376

export_summary = RedshiftToS3Operator(

377

task_id='export_summary',

378

select_query="""

379

SELECT

380

region,

381

product_category,

382

DATE_TRUNC('month', sale_date) as month,

383

SUM(revenue) as total_revenue,

384

COUNT(*) as transaction_count

385

FROM fact_sales

386

WHERE sale_date >= DATE_TRUNC('month', CURRENT_DATE - INTERVAL '1 month')

387

AND sale_date < DATE_TRUNC('month', CURRENT_DATE)

388

GROUP BY region, product_category, month

389

""",

390

s3_bucket='analytics-exports',

391

s3_key='reports/monthly_summary_{{ ds }}.csv',

392

redshift_conn_id='redshift_prod',

393

unload_options=[

394

"HEADER",

395

"DELIMITER ','",

396

"NULL AS ''"

397

],

398

include_header=True,

399

dag=dag

400

)

401

402

# Send report to external partner

403

send_to_partner = S3ToSftpOperator(

404

task_id='send_to_partner',

405

s3_bucket='analytics-exports',

406

s3_key='reports/monthly_summary_{{ ds }}.csv000',

407

sftp_path='/uploads/monthly_summary_{{ ds }}.csv',

408

sftp_conn_id='partner_sftp',

409

dag=dag

410

)

411

412

export_summary >> send_to_partner

413

```

414

415

### DynamoDB Backup to S3

416

417

```python

418

from airflow.providers.amazon.aws.transfers.dynamodb_to_s3 import DynamoDbToS3Operator

419

420

# Daily backup of DynamoDB table

421

backup_user_sessions = DynamoDbToS3Operator(

422

task_id='backup_user_sessions',

423

dynamodb_table_name='user_sessions',

424

s3_bucket_name='dynamodb-backups',

425

s3_key='backups/user_sessions/{{ ds }}/sessions.json',

426

aws_conn_id='aws_default',

427

dynamodb_scan_kwargs={

428

'FilterExpression': 'attribute_exists(session_id)',

429

'ProjectionExpression': 'session_id, user_id, created_at, last_active'

430

},

431

dag=dag

432

)

433

```

434

435

## Types

436

437

```python { .api }

438

# Transfer configuration types

439

class TransferConfig:

440

source_conn_id: str

441

dest_conn_id: str

442

batch_size: int = 1000

443

parallel: bool = False

444

verify_data: bool = True

445

446

# S3 transfer options

447

class S3TransferOptions:

448

multipart_threshold: int = 8 * 1024 * 1024 # 8MB

449

max_concurrency: int = 10

450

multipart_chunksize: int = 8 * 1024 * 1024

451

use_threads: bool = True

452

453

# Redshift COPY options

454

class RedshiftCopyOptions:

455

format: str = 'CSV'

456

delimiter: str = ','

457

quote_character: str = '"'

458

escape_character: str = None

459

null_as: str = ''

460

ignore_header: int = 0

461

date_format: str = 'auto'

462

time_format: str = 'auto'

463

ignore_blank_lines: bool = True

464

truncate_columns: bool = False

465

fill_record: bool = False

466

blanks_as_null: bool = True

467

empty_as_null: bool = True

468

explicit_ids: bool = False

469

acceptanydate: bool = False

470

acceptinvchars: str = None

471

max_error: int = 0

472

compupdate: bool = True

473

statupdate: bool = True

474

475

# File format types

476

class FileFormat:

477

CSV = 'csv'

478

JSON = 'json'

479

PARQUET = 'parquet'

480

AVRO = 'avro'

481

ORC = 'orc'

482

TSV = 'tsv'

483

484

# Compression types

485

class CompressionType:

486

NONE = None

487

GZIP = 'gzip'

488

BZIP2 = 'bz2'

489

LZOP = 'lzop'

490

ZSTD = 'zstd'

491

492

# Transfer status

493

class TransferStatus:

494

PENDING = 'pending'

495

RUNNING = 'running'

496

SUCCESS = 'success'

497

FAILED = 'failed'

498

CANCELLED = 'cancelled'

499

```