0
# Data Transfers
1
2
Transfer operators for moving data from external sources into Trino tables. The transfer functionality provides seamless data integration between various external storage systems and Trino databases, with built-in data processing and transformation capabilities.
3
4
## Capabilities
5
6
### Google Cloud Storage to Trino Transfer
7
8
Loads CSV files from Google Cloud Storage into Trino tables with flexible schema mapping and data processing options.
9
10
```python { .api }
11
from collections.abc import Iterable, Sequence
12
from airflow.utils.context import Context
13
14
class GCSToTrinoOperator(BaseOperator):
15
"""
16
Loads a CSV file from Google Cloud Storage into a Trino table.
17
18
Assumptions:
19
1. CSV file should not have headers
20
2. Trino table with requisite columns is already created
21
3. Optionally, a separate JSON file with headers can be provided
22
23
Template Fields: source_bucket, source_object, trino_table
24
"""
25
26
def __init__(
27
self,
28
*,
29
source_bucket: str,
30
source_object: str,
31
trino_table: str,
32
trino_conn_id: str = "trino_default",
33
gcp_conn_id: str = "google_cloud_default",
34
schema_fields: Iterable[str] | None = None,
35
schema_object: str | None = None,
36
impersonation_chain: str | Sequence[str] | None = None,
37
**kwargs
38
):
39
"""
40
Initialize GCS to Trino transfer operator.
41
42
Parameters:
43
- source_bucket: Source GCS bucket that contains the CSV file
44
- source_object: CSV file path including the path within the bucket
45
- trino_table: Target Trino table name (catalog.schema.table format)
46
- trino_conn_id: Airflow connection ID for Trino database
47
- gcp_conn_id: Airflow connection ID for Google Cloud Platform
48
- schema_fields: Names of columns to fill in the table
49
- schema_object: JSON file with schema fields
50
- impersonation_chain: Service account to impersonate for GCS access
51
- **kwargs: Additional BaseOperator parameters
52
"""
53
pass
54
55
def execute(self, context: Context) -> None:
56
"""Execute the transfer operation."""
57
pass
58
```
59
60
## Usage Examples
61
62
### Basic CSV Transfer
63
64
```python
65
from airflow import DAG
66
from airflow.providers.trino.transfers.gcs_to_trino import GCSToTrinoOperator
67
from datetime import datetime
68
69
# Define DAG
70
dag = DAG(
71
'gcs_to_trino_example',
72
start_date=datetime(2023, 1, 1),
73
schedule_interval=None,
74
catchup=False
75
)
76
77
# Transfer CSV from GCS to Trino
78
transfer_task = GCSToTrinoOperator(
79
task_id='load_data_to_trino',
80
source_bucket='my-data-bucket',
81
source_object='data/sales_data.csv',
82
trino_table='analytics.sales.daily_sales',
83
trino_conn_id='trino_default',
84
gcp_conn_id='google_cloud_default',
85
dag=dag
86
)
87
```
88
89
### Transfer with Custom Connections
90
91
```python
92
from airflow import DAG
93
from airflow.providers.trino.transfers.gcs_to_trino import GCSToTrinoOperator
94
from datetime import datetime
95
96
dag = DAG(
97
'custom_transfer',
98
start_date=datetime(2023, 1, 1),
99
schedule_interval='@daily',
100
catchup=False
101
)
102
103
# Transfer with custom connection IDs
104
transfer_data = GCSToTrinoOperator(
105
task_id='transfer_daily_data',
106
source_bucket='data-lake-bucket',
107
source_object='exports/{{ ds }}/transactions.csv',
108
trino_table='warehouse.transactions.daily',
109
trino_conn_id='production_trino',
110
gcp_conn_id='data_lake_gcp',
111
dag=dag
112
)
113
```
114
115
### Multiple File Transfer Pipeline
116
117
```python
118
from airflow import DAG
119
from airflow.providers.trino.transfers.gcs_to_trino import GCSToTrinoOperator
120
from datetime import datetime
121
122
dag = DAG(
123
'multi_table_transfer',
124
start_date=datetime(2023, 1, 1),
125
schedule_interval='@daily',
126
catchup=False
127
)
128
129
# Transfer multiple datasets
130
tables_config = [
131
{
132
'source_object': 'exports/customers.csv',
133
'trino_table': 'crm.customers.daily_snapshot',
134
'task_id': 'load_customers'
135
},
136
{
137
'source_object': 'exports/orders.csv',
138
'trino_table': 'sales.orders.daily_batch',
139
'task_id': 'load_orders'
140
},
141
{
142
'source_object': 'exports/products.csv',
143
'trino_table': 'inventory.products.catalog',
144
'task_id': 'load_products'
145
}
146
]
147
148
transfer_tasks = []
149
for config in tables_config:
150
task = GCSToTrinoOperator(
151
task_id=config['task_id'],
152
source_bucket='enterprise-data-bucket',
153
source_object=config['source_object'],
154
trino_table=config['trino_table'],
155
trino_conn_id='trino_default',
156
gcp_conn_id='google_cloud_default',
157
dag=dag
158
)
159
transfer_tasks.append(task)
160
161
# Set up task dependencies if needed
162
# transfer_tasks[0] >> transfer_tasks[1] >> transfer_tasks[2]
163
```
164
165
## Configuration Requirements
166
167
### Trino Table Preparation
168
169
Before using the transfer operator, ensure the target Trino table exists with appropriate schema:
170
171
```sql
172
-- Example table creation in Trino
173
CREATE TABLE analytics.sales.daily_sales (
174
transaction_id VARCHAR,
175
customer_id VARCHAR,
176
product_id VARCHAR,
177
quantity INTEGER,
178
price DECIMAL(10,2),
179
transaction_date DATE,
180
store_location VARCHAR
181
);
182
```
183
184
### GCS File Format
185
186
The CSV files in GCS should follow these guidelines:
187
188
1. **No Headers**: CSV files should not contain header rows
189
2. **Consistent Schema**: Column order should match target table schema
190
3. **Proper Encoding**: Files should be UTF-8 encoded
191
4. **Clean Data**: Handle NULL values appropriately (empty strings, NULL keywords)
192
193
Example CSV format:
194
```csv
195
TXN001,CUST123,PROD456,2,29.99,2023-01-15,Store_A
196
TXN002,CUST124,PROD457,1,49.99,2023-01-15,Store_B
197
TXN003,CUST125,PROD458,3,19.99,2023-01-15,Store_A
198
```
199
200
### Connection Configuration
201
202
#### Trino Connection
203
Configure Trino connection in Airflow with:
204
- **Host**: Trino coordinator hostname
205
- **Port**: Trino coordinator port (default 8080)
206
- **Schema**: Default schema (optional)
207
- **Login**: Username for authentication
208
- **Password**: Password (if using basic auth)
209
- **Extra**: Additional authentication and configuration options
210
211
#### Google Cloud Connection
212
Configure GCP connection in Airflow with:
213
- **Project ID**: Google Cloud project containing the GCS buckets
214
- **Keyfile Path**: Service account key file path
215
- **Scopes**: Required GCS access scopes
216
217
## Data Processing Features
218
219
The GCSToTrinoOperator provides several data processing capabilities:
220
221
### Automatic Type Inference
222
- Attempts to infer column types from CSV data
223
- Handles common data types (strings, integers, decimals, dates)
224
- Provides fallback to string type for ambiguous data
225
226
### Data Validation
227
- Validates CSV structure against target table schema
228
- Checks for column count mismatches
229
- Reports data quality issues in task logs
230
231
### Error Handling
232
- Graceful handling of malformed CSV records
233
- Detailed error reporting for debugging
234
- Transaction rollback on failures
235
236
### Performance Optimization
237
- Batch processing for large files
238
- Memory-efficient streaming for large datasets
239
- Configurable batch sizes for optimal performance
240
241
## Integration with Other Operators
242
243
The transfer operators can be combined with other Airflow operators for complete data pipelines:
244
245
```python
246
from airflow import DAG
247
from airflow.providers.trino.transfers.gcs_to_trino import GCSToTrinoOperator
248
from airflow.providers.trino.hooks.trino import TrinoHook
249
from airflow.operators.python import PythonOperator
250
from datetime import datetime
251
252
def validate_data():
253
"""Validate transferred data quality."""
254
hook = TrinoHook(trino_conn_id='trino_default')
255
256
# Check row count
257
count_sql = "SELECT count(*) FROM analytics.sales.daily_sales"
258
row_count = hook.get_first(count_sql)[0]
259
260
if row_count == 0:
261
raise ValueError("No data transferred to target table")
262
263
print(f"Successfully transferred {row_count} rows")
264
265
dag = DAG(
266
'complete_transfer_pipeline',
267
start_date=datetime(2023, 1, 1),
268
schedule_interval='@daily',
269
catchup=False
270
)
271
272
# Transfer data
273
transfer_task = GCSToTrinoOperator(
274
task_id='transfer_sales_data',
275
source_bucket='sales-data-bucket',
276
source_object='daily/{{ ds }}/sales.csv',
277
trino_table='analytics.sales.daily_sales',
278
dag=dag
279
)
280
281
# Validate transfer
282
validate_task = PythonOperator(
283
task_id='validate_transfer',
284
python_callable=validate_data,
285
dag=dag
286
)
287
288
# Set task dependencies
289
transfer_task >> validate_task
290
```