or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

asset-management.mddata-transfers.mddatabase-operations.mdindex.md

data-transfers.mddocs/

0

# Data Transfers

1

2

Transfer operators for moving data from external sources into Trino tables. The transfer functionality provides seamless data integration between various external storage systems and Trino databases, with built-in data processing and transformation capabilities.

3

4

## Capabilities

5

6

### Google Cloud Storage to Trino Transfer

7

8

Loads CSV files from Google Cloud Storage into Trino tables with flexible schema mapping and data processing options.

9

10

```python { .api }

11

from collections.abc import Iterable, Sequence

12

from airflow.utils.context import Context

13

14

class GCSToTrinoOperator(BaseOperator):

15

"""

16

Loads a CSV file from Google Cloud Storage into a Trino table.

17

18

Assumptions:

19

1. CSV file should not have headers

20

2. Trino table with requisite columns is already created

21

3. Optionally, a separate JSON file with headers can be provided

22

23

Template Fields: source_bucket, source_object, trino_table

24

"""

25

26

def __init__(

27

self,

28

*,

29

source_bucket: str,

30

source_object: str,

31

trino_table: str,

32

trino_conn_id: str = "trino_default",

33

gcp_conn_id: str = "google_cloud_default",

34

schema_fields: Iterable[str] | None = None,

35

schema_object: str | None = None,

36

impersonation_chain: str | Sequence[str] | None = None,

37

**kwargs

38

):

39

"""

40

Initialize GCS to Trino transfer operator.

41

42

Parameters:

43

- source_bucket: Source GCS bucket that contains the CSV file

44

- source_object: CSV file path including the path within the bucket

45

- trino_table: Target Trino table name (catalog.schema.table format)

46

- trino_conn_id: Airflow connection ID for Trino database

47

- gcp_conn_id: Airflow connection ID for Google Cloud Platform

48

- schema_fields: Names of columns to fill in the table

49

- schema_object: JSON file with schema fields

50

- impersonation_chain: Service account to impersonate for GCS access

51

- **kwargs: Additional BaseOperator parameters

52

"""

53

pass

54

55

def execute(self, context: Context) -> None:

56

"""Execute the transfer operation."""

57

pass

58

```

59

60

## Usage Examples

61

62

### Basic CSV Transfer

63

64

```python

65

from airflow import DAG

66

from airflow.providers.trino.transfers.gcs_to_trino import GCSToTrinoOperator

67

from datetime import datetime

68

69

# Define DAG

70

dag = DAG(

71

'gcs_to_trino_example',

72

start_date=datetime(2023, 1, 1),

73

schedule_interval=None,

74

catchup=False

75

)

76

77

# Transfer CSV from GCS to Trino

78

transfer_task = GCSToTrinoOperator(

79

task_id='load_data_to_trino',

80

source_bucket='my-data-bucket',

81

source_object='data/sales_data.csv',

82

trino_table='analytics.sales.daily_sales',

83

trino_conn_id='trino_default',

84

gcp_conn_id='google_cloud_default',

85

dag=dag

86

)

87

```

88

89

### Transfer with Custom Connections

90

91

```python

92

from airflow import DAG

93

from airflow.providers.trino.transfers.gcs_to_trino import GCSToTrinoOperator

94

from datetime import datetime

95

96

dag = DAG(

97

'custom_transfer',

98

start_date=datetime(2023, 1, 1),

99

schedule_interval='@daily',

100

catchup=False

101

)

102

103

# Transfer with custom connection IDs

104

transfer_data = GCSToTrinoOperator(

105

task_id='transfer_daily_data',

106

source_bucket='data-lake-bucket',

107

source_object='exports/{{ ds }}/transactions.csv',

108

trino_table='warehouse.transactions.daily',

109

trino_conn_id='production_trino',

110

gcp_conn_id='data_lake_gcp',

111

dag=dag

112

)

113

```

114

115

### Multiple File Transfer Pipeline

116

117

```python

118

from airflow import DAG

119

from airflow.providers.trino.transfers.gcs_to_trino import GCSToTrinoOperator

120

from datetime import datetime

121

122

dag = DAG(

123

'multi_table_transfer',

124

start_date=datetime(2023, 1, 1),

125

schedule_interval='@daily',

126

catchup=False

127

)

128

129

# Transfer multiple datasets

130

tables_config = [

131

{

132

'source_object': 'exports/customers.csv',

133

'trino_table': 'crm.customers.daily_snapshot',

134

'task_id': 'load_customers'

135

},

136

{

137

'source_object': 'exports/orders.csv',

138

'trino_table': 'sales.orders.daily_batch',

139

'task_id': 'load_orders'

140

},

141

{

142

'source_object': 'exports/products.csv',

143

'trino_table': 'inventory.products.catalog',

144

'task_id': 'load_products'

145

}

146

]

147

148

transfer_tasks = []

149

for config in tables_config:

150

task = GCSToTrinoOperator(

151

task_id=config['task_id'],

152

source_bucket='enterprise-data-bucket',

153

source_object=config['source_object'],

154

trino_table=config['trino_table'],

155

trino_conn_id='trino_default',

156

gcp_conn_id='google_cloud_default',

157

dag=dag

158

)

159

transfer_tasks.append(task)

160

161

# Set up task dependencies if needed

162

# transfer_tasks[0] >> transfer_tasks[1] >> transfer_tasks[2]

163

```

164

165

## Configuration Requirements

166

167

### Trino Table Preparation

168

169

Before using the transfer operator, ensure the target Trino table exists with appropriate schema:

170

171

```sql

172

-- Example table creation in Trino

173

CREATE TABLE analytics.sales.daily_sales (

174

transaction_id VARCHAR,

175

customer_id VARCHAR,

176

product_id VARCHAR,

177

quantity INTEGER,

178

price DECIMAL(10,2),

179

transaction_date DATE,

180

store_location VARCHAR

181

);

182

```

183

184

### GCS File Format

185

186

The CSV files in GCS should follow these guidelines:

187

188

1. **No Headers**: CSV files should not contain header rows

189

2. **Consistent Schema**: Column order should match target table schema

190

3. **Proper Encoding**: Files should be UTF-8 encoded

191

4. **Clean Data**: Handle NULL values appropriately (empty strings, NULL keywords)

192

193

Example CSV format:

194

```csv

195

TXN001,CUST123,PROD456,2,29.99,2023-01-15,Store_A

196

TXN002,CUST124,PROD457,1,49.99,2023-01-15,Store_B

197

TXN003,CUST125,PROD458,3,19.99,2023-01-15,Store_A

198

```

199

200

### Connection Configuration

201

202

#### Trino Connection

203

Configure Trino connection in Airflow with:

204

- **Host**: Trino coordinator hostname

205

- **Port**: Trino coordinator port (default 8080)

206

- **Schema**: Default schema (optional)

207

- **Login**: Username for authentication

208

- **Password**: Password (if using basic auth)

209

- **Extra**: Additional authentication and configuration options

210

211

#### Google Cloud Connection

212

Configure GCP connection in Airflow with:

213

- **Project ID**: Google Cloud project containing the GCS buckets

214

- **Keyfile Path**: Service account key file path

215

- **Scopes**: Required GCS access scopes

216

217

## Data Processing Features

218

219

The GCSToTrinoOperator provides several data processing capabilities:

220

221

### Automatic Type Inference

222

- Attempts to infer column types from CSV data

223

- Handles common data types (strings, integers, decimals, dates)

224

- Provides fallback to string type for ambiguous data

225

226

### Data Validation

227

- Validates CSV structure against target table schema

228

- Checks for column count mismatches

229

- Reports data quality issues in task logs

230

231

### Error Handling

232

- Graceful handling of malformed CSV records

233

- Detailed error reporting for debugging

234

- Transaction rollback on failures

235

236

### Performance Optimization

237

- Batch processing for large files

238

- Memory-efficient streaming for large datasets

239

- Configurable batch sizes for optimal performance

240

241

## Integration with Other Operators

242

243

The transfer operators can be combined with other Airflow operators for complete data pipelines:

244

245

```python

246

from airflow import DAG

247

from airflow.providers.trino.transfers.gcs_to_trino import GCSToTrinoOperator

248

from airflow.providers.trino.hooks.trino import TrinoHook

249

from airflow.operators.python import PythonOperator

250

from datetime import datetime

251

252

def validate_data():

253

"""Validate transferred data quality."""

254

hook = TrinoHook(trino_conn_id='trino_default')

255

256

# Check row count

257

count_sql = "SELECT count(*) FROM analytics.sales.daily_sales"

258

row_count = hook.get_first(count_sql)[0]

259

260

if row_count == 0:

261

raise ValueError("No data transferred to target table")

262

263

print(f"Successfully transferred {row_count} rows")

264

265

dag = DAG(

266

'complete_transfer_pipeline',

267

start_date=datetime(2023, 1, 1),

268

schedule_interval='@daily',

269

catchup=False

270

)

271

272

# Transfer data

273

transfer_task = GCSToTrinoOperator(

274

task_id='transfer_sales_data',

275

source_bucket='sales-data-bucket',

276

source_object='daily/{{ ds }}/sales.csv',

277

trino_table='analytics.sales.daily_sales',

278

dag=dag

279

)

280

281

# Validate transfer

282

validate_task = PythonOperator(

283

task_id='validate_transfer',

284

python_callable=validate_data,

285

dag=dag

286

)

287

288

# Set task dependencies

289

transfer_task >> validate_task

290

```