Provider package for integrating Apache Airflow with Trino database for queries, data transfers, and connection management
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Transfer operators for moving data from external sources into Trino tables. The transfer functionality provides seamless data integration between various external storage systems and Trino databases, with built-in data processing and transformation capabilities.
Loads CSV files from Google Cloud Storage into Trino tables with flexible schema mapping and data processing options.
from collections.abc import Iterable, Sequence
from airflow.utils.context import Context
class GCSToTrinoOperator(BaseOperator):
"""
Loads a CSV file from Google Cloud Storage into a Trino table.
Assumptions:
1. CSV file should not have headers
2. Trino table with requisite columns is already created
3. Optionally, a separate JSON file with headers can be provided
Template Fields: source_bucket, source_object, trino_table
"""
def __init__(
self,
*,
source_bucket: str,
source_object: str,
trino_table: str,
trino_conn_id: str = "trino_default",
gcp_conn_id: str = "google_cloud_default",
schema_fields: Iterable[str] | None = None,
schema_object: str | None = None,
impersonation_chain: str | Sequence[str] | None = None,
**kwargs
):
"""
Initialize GCS to Trino transfer operator.
Parameters:
- source_bucket: Source GCS bucket that contains the CSV file
- source_object: CSV file path including the path within the bucket
- trino_table: Target Trino table name (catalog.schema.table format)
- trino_conn_id: Airflow connection ID for Trino database
- gcp_conn_id: Airflow connection ID for Google Cloud Platform
- schema_fields: Names of columns to fill in the table
- schema_object: JSON file with schema fields
- impersonation_chain: Service account to impersonate for GCS access
- **kwargs: Additional BaseOperator parameters
"""
pass
def execute(self, context: Context) -> None:
"""Execute the transfer operation."""
passfrom airflow import DAG
from airflow.providers.trino.transfers.gcs_to_trino import GCSToTrinoOperator
from datetime import datetime
# Define DAG
dag = DAG(
'gcs_to_trino_example',
start_date=datetime(2023, 1, 1),
schedule_interval=None,
catchup=False
)
# Transfer CSV from GCS to Trino
transfer_task = GCSToTrinoOperator(
task_id='load_data_to_trino',
source_bucket='my-data-bucket',
source_object='data/sales_data.csv',
trino_table='analytics.sales.daily_sales',
trino_conn_id='trino_default',
gcp_conn_id='google_cloud_default',
dag=dag
)from airflow import DAG
from airflow.providers.trino.transfers.gcs_to_trino import GCSToTrinoOperator
from datetime import datetime
dag = DAG(
'custom_transfer',
start_date=datetime(2023, 1, 1),
schedule_interval='@daily',
catchup=False
)
# Transfer with custom connection IDs
transfer_data = GCSToTrinoOperator(
task_id='transfer_daily_data',
source_bucket='data-lake-bucket',
source_object='exports/{{ ds }}/transactions.csv',
trino_table='warehouse.transactions.daily',
trino_conn_id='production_trino',
gcp_conn_id='data_lake_gcp',
dag=dag
)from airflow import DAG
from airflow.providers.trino.transfers.gcs_to_trino import GCSToTrinoOperator
from datetime import datetime
dag = DAG(
'multi_table_transfer',
start_date=datetime(2023, 1, 1),
schedule_interval='@daily',
catchup=False
)
# Transfer multiple datasets
tables_config = [
{
'source_object': 'exports/customers.csv',
'trino_table': 'crm.customers.daily_snapshot',
'task_id': 'load_customers'
},
{
'source_object': 'exports/orders.csv',
'trino_table': 'sales.orders.daily_batch',
'task_id': 'load_orders'
},
{
'source_object': 'exports/products.csv',
'trino_table': 'inventory.products.catalog',
'task_id': 'load_products'
}
]
transfer_tasks = []
for config in tables_config:
task = GCSToTrinoOperator(
task_id=config['task_id'],
source_bucket='enterprise-data-bucket',
source_object=config['source_object'],
trino_table=config['trino_table'],
trino_conn_id='trino_default',
gcp_conn_id='google_cloud_default',
dag=dag
)
transfer_tasks.append(task)
# Set up task dependencies if needed
# transfer_tasks[0] >> transfer_tasks[1] >> transfer_tasks[2]Before using the transfer operator, ensure the target Trino table exists with appropriate schema:
-- Example table creation in Trino
CREATE TABLE analytics.sales.daily_sales (
transaction_id VARCHAR,
customer_id VARCHAR,
product_id VARCHAR,
quantity INTEGER,
price DECIMAL(10,2),
transaction_date DATE,
store_location VARCHAR
);The CSV files in GCS should follow these guidelines:
Example CSV format:
TXN001,CUST123,PROD456,2,29.99,2023-01-15,Store_A
TXN002,CUST124,PROD457,1,49.99,2023-01-15,Store_B
TXN003,CUST125,PROD458,3,19.99,2023-01-15,Store_AConfigure Trino connection in Airflow with:
Configure GCP connection in Airflow with:
The GCSToTrinoOperator provides several data processing capabilities:
The transfer operators can be combined with other Airflow operators for complete data pipelines:
from airflow import DAG
from airflow.providers.trino.transfers.gcs_to_trino import GCSToTrinoOperator
from airflow.providers.trino.hooks.trino import TrinoHook
from airflow.operators.python import PythonOperator
from datetime import datetime
def validate_data():
"""Validate transferred data quality."""
hook = TrinoHook(trino_conn_id='trino_default')
# Check row count
count_sql = "SELECT count(*) FROM analytics.sales.daily_sales"
row_count = hook.get_first(count_sql)[0]
if row_count == 0:
raise ValueError("No data transferred to target table")
print(f"Successfully transferred {row_count} rows")
dag = DAG(
'complete_transfer_pipeline',
start_date=datetime(2023, 1, 1),
schedule_interval='@daily',
catchup=False
)
# Transfer data
transfer_task = GCSToTrinoOperator(
task_id='transfer_sales_data',
source_bucket='sales-data-bucket',
source_object='daily/{{ ds }}/sales.csv',
trino_table='analytics.sales.daily_sales',
dag=dag
)
# Validate transfer
validate_task = PythonOperator(
task_id='validate_transfer',
python_callable=validate_data,
dag=dag
)
# Set task dependencies
transfer_task >> validate_taskInstall with Tessl CLI
npx tessl i tessl/pypi-apache-airflow-providers-trino