or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

common-utilities.mddata-transfers.mdfirebase.mdgcp-services.mdgoogle-ads.mdgoogle-workspace.mdindex.mdleveldb.mdmarketing-platform.md

data-transfers.mddocs/

0

# Data Transfer Operations

1

2

Extensive transfer capabilities between Google services and external systems including AWS S3, Azure Blob Storage, SFTP, local filesystems, and various databases. Enables seamless data movement across cloud platforms and on-premises systems.

3

4

## Capabilities

5

6

### Cloud Storage Transfers

7

8

Transfer operations to and from Google Cloud Storage with various external systems.

9

10

```python { .api }

11

class GCSToBigQueryOperator(BaseOperator):

12

"""

13

Transfers data from Google Cloud Storage to BigQuery tables.

14

15

Args:

16

bucket (str): GCS bucket name

17

source_objects (List[str]): List of GCS object paths

18

destination_project_dataset_table (str): BigQuery destination in format project.dataset.table

19

schema_fields (List[Dict]): Table schema definition

20

write_disposition (str): Write mode (WRITE_TRUNCATE, WRITE_APPEND, WRITE_EMPTY)

21

source_format (str): Source data format (CSV, JSON, AVRO, PARQUET)

22

gcp_conn_id (str): Connection ID for Google Cloud Platform

23

24

Returns:

25

BigQuery job result

26

"""

27

def __init__(

28

self,

29

bucket: str,

30

source_objects: List[str],

31

destination_project_dataset_table: str,

32

schema_fields: Optional[List[Dict]] = None,

33

write_disposition: str = "WRITE_EMPTY",

34

source_format: str = "CSV",

35

gcp_conn_id: str = "google_cloud_default",

36

**kwargs

37

): ...

38

39

class BigQueryToGCSOperator(BaseOperator):

40

"""

41

Transfers data from BigQuery to Google Cloud Storage.

42

43

Args:

44

source_project_dataset_table (str): BigQuery source table

45

destination_cloud_storage_uris (List[str]): GCS destination URIs

46

compression (str): Output compression format

47

export_format (str): Export format (CSV, JSON, AVRO, PARQUET)

48

gcp_conn_id (str): Connection ID for Google Cloud Platform

49

50

Returns:

51

Export job result with file locations

52

"""

53

def __init__(

54

self,

55

source_project_dataset_table: str,

56

destination_cloud_storage_uris: List[str],

57

compression: str = "NONE",

58

export_format: str = "CSV",

59

gcp_conn_id: str = "google_cloud_default",

60

**kwargs

61

): ...

62

63

class S3ToGCSOperator(BaseOperator):

64

"""

65

Transfers objects from Amazon S3 to Google Cloud Storage.

66

67

Args:

68

bucket (str): S3 bucket name

69

prefix (str): S3 object key prefix

70

gcp_conn_id (str): Connection ID for Google Cloud Platform

71

aws_conn_id (str): Connection ID for Amazon Web Services

72

dest_gcs_conn_id (str): Destination GCS connection ID

73

dest_bucket (str): Destination GCS bucket name

74

dest_prefix (str): Destination GCS object prefix

75

replace (bool): Whether to replace existing objects

76

77

Returns:

78

List of transferred object keys

79

"""

80

def __init__(

81

self,

82

bucket: str,

83

prefix: str = "",

84

gcp_conn_id: str = "google_cloud_default",

85

aws_conn_id: str = "aws_default",

86

dest_gcs_conn_id: Optional[str] = None,

87

dest_bucket: Optional[str] = None,

88

dest_prefix: str = "",

89

replace: bool = True,

90

**kwargs

91

): ...

92

93

class AzureBlobStorageToGCSOperator(BaseOperator):

94

"""

95

Transfers blobs from Azure Blob Storage to Google Cloud Storage.

96

97

Args:

98

blob_name (str): Azure blob name or prefix

99

file_path (str): Destination GCS object path

100

container_name (str): Azure container name

101

bucket_name (str): Destination GCS bucket name

102

azure_conn_id (str): Connection ID for Azure Blob Storage

103

gcp_conn_id (str): Connection ID for Google Cloud Platform

104

105

Returns:

106

GCS object path of transferred data

107

"""

108

def __init__(

109

self,

110

blob_name: str,

111

file_path: str,

112

container_name: str,

113

bucket_name: str,

114

azure_conn_id: str = "azure_blob_default",

115

gcp_conn_id: str = "google_cloud_default",

116

**kwargs

117

): ...

118

```

119

120

### Database Transfers

121

122

Transfer operations between various databases and Google Cloud services.

123

124

```python { .api }

125

class MySQLToGCSOperator(BaseOperator):

126

"""

127

Transfers data from MySQL database to Google Cloud Storage.

128

129

Args:

130

sql (str): SQL query to execute

131

bucket (str): Destination GCS bucket name

132

filename (str): Destination GCS object path

133

schema_filename (str): Optional schema file path in GCS

134

mysql_conn_id (str): Connection ID for MySQL database

135

gcp_conn_id (str): Connection ID for Google Cloud Platform

136

137

Returns:

138

GCS object path of exported data

139

"""

140

def __init__(

141

self,

142

sql: str,

143

bucket: str,

144

filename: str,

145

schema_filename: Optional[str] = None,

146

mysql_conn_id: str = "mysql_default",

147

gcp_conn_id: str = "google_cloud_default",

148

**kwargs

149

): ...

150

151

class PostgrestoGCSOperator(BaseOperator):

152

"""

153

Transfers data from PostgreSQL database to Google Cloud Storage.

154

155

Args:

156

sql (str): SQL query to execute

157

bucket (str): Destination GCS bucket name

158

filename (str): Destination GCS object path

159

schema_filename (str): Optional schema file path in GCS

160

postgres_conn_id (str): Connection ID for PostgreSQL database

161

gcp_conn_id (str): Connection ID for Google Cloud Platform

162

163

Returns:

164

GCS object path of exported data

165

"""

166

def __init__(

167

self,

168

sql: str,

169

bucket: str,

170

filename: str,

171

schema_filename: Optional[str] = None,

172

postgres_conn_id: str = "postgres_default",

173

gcp_conn_id: str = "google_cloud_default",

174

**kwargs

175

): ...

176

177

class BigQueryToPostgresOperator(BaseOperator):

178

"""

179

Transfers data from BigQuery to PostgreSQL database.

180

181

Args:

182

dataset_table (str): BigQuery source table

183

target_table_name (str): PostgreSQL destination table name

184

postgres_conn_id (str): Connection ID for PostgreSQL database

185

bigquery_conn_id (str): Connection ID for BigQuery

186

187

Returns:

188

Number of rows transferred

189

"""

190

def __init__(

191

self,

192

dataset_table: str,

193

target_table_name: str,

194

postgres_conn_id: str = "postgres_default",

195

bigquery_conn_id: str = "google_cloud_default",

196

**kwargs

197

): ...

198

```

199

200

### File System Transfers

201

202

Transfer operations between GCS and various file systems.

203

204

```python { .api }

205

class GCSToLocalFilesystemOperator(BaseOperator):

206

"""

207

Downloads objects from Google Cloud Storage to local filesystem.

208

209

Args:

210

bucket (str): GCS bucket name

211

object_name (str): GCS object path

212

filename (str): Local filesystem destination path

213

gcp_conn_id (str): Connection ID for Google Cloud Platform

214

215

Returns:

216

Local file path of downloaded object

217

"""

218

def __init__(

219

self,

220

bucket: str,

221

object_name: str,

222

filename: str,

223

gcp_conn_id: str = "google_cloud_default",

224

**kwargs

225

): ...

226

227

class SFTPToGCSOperator(BaseOperator):

228

"""

229

Transfers files from SFTP server to Google Cloud Storage.

230

231

Args:

232

source_path (str): SFTP source file path

233

destination_bucket (str): GCS destination bucket name

234

destination_path (str): GCS destination object path

235

sftp_conn_id (str): Connection ID for SFTP server

236

gcp_conn_id (str): Connection ID for Google Cloud Platform

237

move_object (bool): Whether to delete source after transfer

238

239

Returns:

240

GCS object path of transferred file

241

"""

242

def __init__(

243

self,

244

source_path: str,

245

destination_bucket: str,

246

destination_path: str,

247

sftp_conn_id: str = "sftp_default",

248

gcp_conn_id: str = "google_cloud_default",

249

move_object: bool = False,

250

**kwargs

251

): ...

252

```

253

254

## Usage Examples

255

256

### Multi-Stage ETL Pipeline

257

258

```python

259

from airflow import DAG

260

from airflow.providers.google.cloud.transfers.s3_to_gcs import S3ToGCSOperator

261

from airflow.providers.google.cloud.transfers.gcs_to_bigquery import GCSToBigQueryOperator

262

from airflow.providers.google.cloud.transfers.bigquery_to_gcs import BigQueryToGCSOperator

263

from datetime import datetime

264

265

dag = DAG(

266

'multi_cloud_etl',

267

default_args={'start_date': datetime(2023, 1, 1)},

268

schedule_interval='@daily',

269

catchup=False

270

)

271

272

# Transfer raw data from S3 to GCS

273

s3_to_gcs = S3ToGCSOperator(

274

task_id='s3_to_gcs',

275

bucket='source-data-bucket',

276

prefix='raw-data/{{ ds }}/',

277

dest_bucket='processed-data-lake',

278

dest_prefix='staging/{{ ds }}/',

279

aws_conn_id='aws_default',

280

dag=dag

281

)

282

283

# Load data into BigQuery for processing

284

gcs_to_bq = GCSToBigQueryOperator(

285

task_id='gcs_to_bq',

286

bucket='processed-data-lake',

287

source_objects=['staging/{{ ds }}/*.csv'],

288

destination_project_dataset_table='analytics.raw_data.daily_imports',

289

schema_fields=[

290

{'name': 'timestamp', 'type': 'TIMESTAMP', 'mode': 'REQUIRED'},

291

{'name': 'user_id', 'type': 'STRING', 'mode': 'REQUIRED'},

292

{'name': 'event_type', 'type': 'STRING', 'mode': 'REQUIRED'},

293

{'name': 'properties', 'type': 'JSON', 'mode': 'NULLABLE'},

294

],

295

write_disposition='WRITE_TRUNCATE',

296

dag=dag

297

)

298

299

# Export processed results

300

bq_to_gcs = BigQueryToGCSOperator(

301

task_id='bq_to_gcs',

302

source_project_dataset_table='analytics.processed_data.user_metrics',

303

destination_cloud_storage_uris=[

304

'gs://processed-data-lake/exports/{{ ds }}/user_metrics.parquet'

305

],

306

export_format='PARQUET',

307

dag=dag

308

)

309

310

s3_to_gcs >> gcs_to_bq >> bq_to_gcs

311

```

312

313

### Database Migration Pipeline

314

315

```python

316

from airflow import DAG

317

from airflow.providers.google.cloud.transfers.mysql_to_gcs import MySQLToGCSOperator

318

from airflow.providers.google.cloud.transfers.gcs_to_bigquery import GCSToBigQueryOperator

319

from datetime import datetime

320

321

dag = DAG(

322

'database_migration',

323

default_args={'start_date': datetime(2023, 1, 1)},

324

schedule_interval='@daily',

325

catchup=False

326

)

327

328

# Extract data from MySQL

329

mysql_to_gcs = MySQLToGCSOperator(

330

task_id='mysql_to_gcs',

331

sql='''

332

SELECT

333

customer_id,

334

order_date,

335

total_amount,

336

status,

337

created_at,

338

updated_at

339

FROM orders

340

WHERE DATE(created_at) = '{{ ds }}'

341

''',

342

bucket='data-migration',

343

filename='orders/{{ ds }}/orders.csv',

344

mysql_conn_id='mysql_prod',

345

dag=dag

346

)

347

348

# Load into BigQuery

349

gcs_to_bigquery = GCSToBigQueryOperator(

350

task_id='gcs_to_bigquery',

351

bucket='data-migration',

352

source_objects=['orders/{{ ds }}/orders.csv'],

353

destination_project_dataset_table='warehouse.sales.orders',

354

schema_fields=[

355

{'name': 'customer_id', 'type': 'INTEGER', 'mode': 'REQUIRED'},

356

{'name': 'order_date', 'type': 'DATE', 'mode': 'REQUIRED'},

357

{'name': 'total_amount', 'type': 'NUMERIC', 'mode': 'REQUIRED'},

358

{'name': 'status', 'type': 'STRING', 'mode': 'REQUIRED'},

359

{'name': 'created_at', 'type': 'TIMESTAMP', 'mode': 'REQUIRED'},

360

{'name': 'updated_at', 'type': 'TIMESTAMP', 'mode': 'NULLABLE'},

361

],

362

write_disposition='WRITE_APPEND',

363

dag=dag

364

)

365

366

mysql_to_gcs >> gcs_to_bigquery

367

```

368

369

## Types

370

371

```python { .api }

372

from typing import List, Optional, Dict, Any, Union

373

from airflow.models import BaseOperator

374

375

# Transfer operation types

376

SourcePath = str

377

DestinationPath = str

378

BucketName = str

379

ObjectKey = str

380

TableReference = str

381

382

# Schema and format types

383

SchemaField = Dict[str, str]

384

WriteDisposition = str # WRITE_TRUNCATE, WRITE_APPEND, WRITE_EMPTY

385

SourceFormat = str # CSV, JSON, AVRO, PARQUET

386

ExportFormat = str # CSV, JSON, AVRO, PARQUET

387

CompressionType = str # NONE, GZIP, DEFLATE, SNAPPY

388

389

# Connection types

390

ConnectionId = str

391

TransferResult = Dict[str, Any]

392

```