CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-apache-airflow-providers-trino

Provider package for integrating Apache Airflow with Trino database for queries, data transfers, and connection management

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

data-transfers.mddocs/

Data Transfers

Transfer operators for moving data from external sources into Trino tables. The transfer functionality provides seamless data integration between various external storage systems and Trino databases, with built-in data processing and transformation capabilities.

Capabilities

Google Cloud Storage to Trino Transfer

Loads CSV files from Google Cloud Storage into Trino tables with flexible schema mapping and data processing options.

from collections.abc import Iterable, Sequence
from airflow.utils.context import Context

class GCSToTrinoOperator(BaseOperator):
    """
    Loads a CSV file from Google Cloud Storage into a Trino table.
    
    Assumptions:
    1. CSV file should not have headers
    2. Trino table with requisite columns is already created
    3. Optionally, a separate JSON file with headers can be provided
    
    Template Fields: source_bucket, source_object, trino_table
    """
    
    def __init__(
        self,
        *,
        source_bucket: str,
        source_object: str,
        trino_table: str,
        trino_conn_id: str = "trino_default",
        gcp_conn_id: str = "google_cloud_default",
        schema_fields: Iterable[str] | None = None,
        schema_object: str | None = None,
        impersonation_chain: str | Sequence[str] | None = None,
        **kwargs
    ):
        """
        Initialize GCS to Trino transfer operator.
        
        Parameters:
        - source_bucket: Source GCS bucket that contains the CSV file
        - source_object: CSV file path including the path within the bucket
        - trino_table: Target Trino table name (catalog.schema.table format)
        - trino_conn_id: Airflow connection ID for Trino database
        - gcp_conn_id: Airflow connection ID for Google Cloud Platform
        - schema_fields: Names of columns to fill in the table
        - schema_object: JSON file with schema fields
        - impersonation_chain: Service account to impersonate for GCS access
        - **kwargs: Additional BaseOperator parameters
        """
        pass
    
    def execute(self, context: Context) -> None:
        """Execute the transfer operation."""
        pass

Usage Examples

Basic CSV Transfer

from airflow import DAG
from airflow.providers.trino.transfers.gcs_to_trino import GCSToTrinoOperator
from datetime import datetime

# Define DAG
dag = DAG(
    'gcs_to_trino_example',
    start_date=datetime(2023, 1, 1),
    schedule_interval=None,
    catchup=False
)

# Transfer CSV from GCS to Trino
transfer_task = GCSToTrinoOperator(
    task_id='load_data_to_trino',
    source_bucket='my-data-bucket',
    source_object='data/sales_data.csv',
    trino_table='analytics.sales.daily_sales',
    trino_conn_id='trino_default',
    gcp_conn_id='google_cloud_default',
    dag=dag
)

Transfer with Custom Connections

from airflow import DAG
from airflow.providers.trino.transfers.gcs_to_trino import GCSToTrinoOperator
from datetime import datetime

dag = DAG(
    'custom_transfer',
    start_date=datetime(2023, 1, 1),
    schedule_interval='@daily',
    catchup=False
)

# Transfer with custom connection IDs
transfer_data = GCSToTrinoOperator(
    task_id='transfer_daily_data',
    source_bucket='data-lake-bucket',
    source_object='exports/{{ ds }}/transactions.csv',
    trino_table='warehouse.transactions.daily',
    trino_conn_id='production_trino',
    gcp_conn_id='data_lake_gcp',
    dag=dag
)

Multiple File Transfer Pipeline

from airflow import DAG
from airflow.providers.trino.transfers.gcs_to_trino import GCSToTrinoOperator
from datetime import datetime

dag = DAG(
    'multi_table_transfer',
    start_date=datetime(2023, 1, 1),
    schedule_interval='@daily',
    catchup=False
)

# Transfer multiple datasets
tables_config = [
    {
        'source_object': 'exports/customers.csv',
        'trino_table': 'crm.customers.daily_snapshot',
        'task_id': 'load_customers'
    },
    {
        'source_object': 'exports/orders.csv', 
        'trino_table': 'sales.orders.daily_batch',
        'task_id': 'load_orders'
    },
    {
        'source_object': 'exports/products.csv',
        'trino_table': 'inventory.products.catalog',
        'task_id': 'load_products'
    }
]

transfer_tasks = []
for config in tables_config:
    task = GCSToTrinoOperator(
        task_id=config['task_id'],
        source_bucket='enterprise-data-bucket',
        source_object=config['source_object'],
        trino_table=config['trino_table'],
        trino_conn_id='trino_default',
        gcp_conn_id='google_cloud_default',
        dag=dag
    )
    transfer_tasks.append(task)

# Set up task dependencies if needed
# transfer_tasks[0] >> transfer_tasks[1] >> transfer_tasks[2]

Configuration Requirements

Trino Table Preparation

Before using the transfer operator, ensure the target Trino table exists with appropriate schema:

-- Example table creation in Trino
CREATE TABLE analytics.sales.daily_sales (
    transaction_id VARCHAR,
    customer_id VARCHAR,
    product_id VARCHAR,
    quantity INTEGER,
    price DECIMAL(10,2),
    transaction_date DATE,
    store_location VARCHAR
);

GCS File Format

The CSV files in GCS should follow these guidelines:

  1. No Headers: CSV files should not contain header rows
  2. Consistent Schema: Column order should match target table schema
  3. Proper Encoding: Files should be UTF-8 encoded
  4. Clean Data: Handle NULL values appropriately (empty strings, NULL keywords)

Example CSV format:

TXN001,CUST123,PROD456,2,29.99,2023-01-15,Store_A
TXN002,CUST124,PROD457,1,49.99,2023-01-15,Store_B
TXN003,CUST125,PROD458,3,19.99,2023-01-15,Store_A

Connection Configuration

Trino Connection

Configure Trino connection in Airflow with:

  • Host: Trino coordinator hostname
  • Port: Trino coordinator port (default 8080)
  • Schema: Default schema (optional)
  • Login: Username for authentication
  • Password: Password (if using basic auth)
  • Extra: Additional authentication and configuration options

Google Cloud Connection

Configure GCP connection in Airflow with:

  • Project ID: Google Cloud project containing the GCS buckets
  • Keyfile Path: Service account key file path
  • Scopes: Required GCS access scopes

Data Processing Features

The GCSToTrinoOperator provides several data processing capabilities:

Automatic Type Inference

  • Attempts to infer column types from CSV data
  • Handles common data types (strings, integers, decimals, dates)
  • Provides fallback to string type for ambiguous data

Data Validation

  • Validates CSV structure against target table schema
  • Checks for column count mismatches
  • Reports data quality issues in task logs

Error Handling

  • Graceful handling of malformed CSV records
  • Detailed error reporting for debugging
  • Transaction rollback on failures

Performance Optimization

  • Batch processing for large files
  • Memory-efficient streaming for large datasets
  • Configurable batch sizes for optimal performance

Integration with Other Operators

The transfer operators can be combined with other Airflow operators for complete data pipelines:

from airflow import DAG
from airflow.providers.trino.transfers.gcs_to_trino import GCSToTrinoOperator
from airflow.providers.trino.hooks.trino import TrinoHook
from airflow.operators.python import PythonOperator
from datetime import datetime

def validate_data():
    """Validate transferred data quality."""
    hook = TrinoHook(trino_conn_id='trino_default')
    
    # Check row count
    count_sql = "SELECT count(*) FROM analytics.sales.daily_sales"
    row_count = hook.get_first(count_sql)[0]
    
    if row_count == 0:
        raise ValueError("No data transferred to target table")
    
    print(f"Successfully transferred {row_count} rows")

dag = DAG(
    'complete_transfer_pipeline',
    start_date=datetime(2023, 1, 1),
    schedule_interval='@daily',
    catchup=False
)

# Transfer data
transfer_task = GCSToTrinoOperator(
    task_id='transfer_sales_data',
    source_bucket='sales-data-bucket',
    source_object='daily/{{ ds }}/sales.csv',
    trino_table='analytics.sales.daily_sales',
    dag=dag
)

# Validate transfer
validate_task = PythonOperator(
    task_id='validate_transfer',
    python_callable=validate_data,
    dag=dag
)

# Set task dependencies
transfer_task >> validate_task

Install with Tessl CLI

npx tessl i tessl/pypi-apache-airflow-providers-trino

docs

asset-management.md

data-transfers.md

database-operations.md

index.md

tile.json