CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-apache-airflow-providers-trino

Provider package for integrating Apache Airflow with Trino database for queries, data transfers, and connection management

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

asset-management.mddocs/

Asset Management

URI validation and management for Trino data assets. Provides standardized handling of Trino URIs with proper format validation, default port configuration, and integration with Airflow's asset and dataset management systems.

Capabilities

URI Sanitization

Validates and sanitizes Trino URI format to ensure proper addressing of Trino resources.

def sanitize_uri(uri: SplitResult) -> SplitResult:
    """
    Validate and sanitize Trino URI format.
    
    Ensures the URI follows the proper trino:// format with required components:
    - Host must be present
    - Port defaults to 8080 if not specified  
    - Path must contain catalog, schema, and table names
    
    Parameters:
    - uri: SplitResult object from urllib.parse.urlsplit()
    
    Returns:
    SplitResult with validated and sanitized URI components
    
    Raises:
    ValueError: If URI format is invalid or missing required components
    """
    pass

URI Format Requirements

Trino URIs must follow the standardized format for proper asset identification:

Standard Format

trino://host:port/catalog/schema/table

Components

  • Scheme: Must be trino://
  • Host: Trino coordinator hostname (required)
  • Port: Trino coordinator port (defaults to 8080 if not specified)
  • Path: Must contain exactly three path segments:
    1. Catalog name
    2. Schema name
    3. Table name

Usage Examples

Basic URI Sanitization

from urllib.parse import urlsplit
from airflow.providers.trino.assets.trino import sanitize_uri

# Complete URI with port
uri_string = "trino://trino-cluster.example.com:8080/analytics/sales/daily_transactions"
uri = urlsplit(uri_string)
sanitized_uri = sanitize_uri(uri)
print(f"Sanitized URI: {sanitized_uri.geturl()}")

URI with Default Port

from urllib.parse import urlsplit
from airflow.providers.trino.assets.trino import sanitize_uri

# URI without port - will default to 8080
uri_string = "trino://trino-cluster.example.com/warehouse/customers/profiles"
uri = urlsplit(uri_string)
sanitized_uri = sanitize_uri(uri)
print(f"URI with default port: {sanitized_uri.geturl()}")
# Output: trino://trino-cluster.example.com:8080/warehouse/customers/profiles

URI Validation Error Handling

from urllib.parse import urlsplit
from airflow.providers.trino.assets.trino import sanitize_uri

try:
    # Invalid URI - missing host
    invalid_uri = urlsplit("trino:///catalog/schema/table")
    sanitize_uri(invalid_uri)
except ValueError as e:
    print(f"Validation error: {e}")
    # Output: URI format trino:// must contain a host

try:
    # Invalid URI - incomplete path
    invalid_uri = urlsplit("trino://host:8080/catalog/schema")
    sanitize_uri(invalid_uri)
except ValueError as e:
    print(f"Validation error: {e}")
    # Output: URI format trino:// must contain catalog, schema, and table names

Integration with Airflow Assets

The sanitized URIs integrate with Airflow's asset and dataset management for data lineage tracking:

Asset Definition

from airflow import DAG, Dataset
from airflow.providers.trino.assets.trino import sanitize_uri
from urllib.parse import urlsplit

# Define Trino dataset
trino_uri = "trino://production-cluster:8080/analytics/sales/daily_revenue"
parsed_uri = urlsplit(trino_uri)
sanitized_uri = sanitize_uri(parsed_uri)

# Create Airflow dataset
sales_dataset = Dataset(sanitized_uri.geturl())

Asset-Aware DAG

from airflow import DAG, Dataset
from airflow.operators.python import PythonOperator
from airflow.providers.trino.hooks.trino import TrinoHook
from datetime import datetime

# Define datasets
input_dataset = Dataset("trino://cluster:8080/raw/transactions/daily")
output_dataset = Dataset("trino://cluster:8080/analytics/sales/summary")

def process_sales_data():
    hook = TrinoHook(trino_conn_id='trino_default')
    
    # Transform data
    sql = """
    INSERT INTO analytics.sales.summary
    SELECT 
        date_trunc('day', transaction_time) as date,
        sum(amount) as total_revenue,
        count(*) as transaction_count
    FROM raw.transactions.daily
    WHERE transaction_time >= current_date
    GROUP BY 1
    """
    
    hook.run(sql)

dag = DAG(
    'sales_processing',
    start_date=datetime(2023, 1, 1),
    schedule=[input_dataset],  # Triggered by input dataset updates
    catchup=False
)

process_task = PythonOperator(
    task_id='process_sales',
    python_callable=process_sales_data,
    outlets=[output_dataset],  # Produces output dataset
    dag=dag
)

Provider Registration

The asset URI handling is automatically registered through the provider configuration:

# From get_provider_info()
{
    "asset-uris": [
        {"schemes": ["trino"], "handler": "airflow.providers.trino.assets.trino.sanitize_uri"}
    ],
    "dataset-uris": [
        {"schemes": ["trino"], "handler": "airflow.providers.trino.assets.trino.sanitize_uri"}
    ]
}

This registration enables Airflow to automatically use the sanitization function for any trino:// URIs in asset and dataset definitions.

Error Scenarios

The sanitization function handles several error conditions:

Missing Host

# This will raise ValueError
uri = urlsplit("trino:///catalog/schema/table")
sanitize_uri(uri)  # ValueError: URI format trino:// must contain a host

Incomplete Path

# This will raise ValueError - missing table name
uri = urlsplit("trino://host:8080/catalog/schema")
sanitize_uri(uri)  # ValueError: URI format trino:// must contain catalog, schema, and table names

# This will raise ValueError - missing schema and table
uri = urlsplit("trino://host:8080/catalog")
sanitize_uri(uri)  # ValueError: URI format trino:// must contain catalog, schema, and table names

Valid URI Examples

# All of these are valid and will be properly sanitized:

# With explicit port
"trino://cluster.example.com:8080/catalog/schema/table"

# Without port (defaults to 8080)
"trino://cluster.example.com/catalog/schema/table"

# With complex names
"trino://trino-prod.company.com:8443/data_warehouse/customer_analytics/daily_active_users"

# With numeric components
"trino://trino01.internal:9000/db2023/schema_v2/table_001"

Best Practices

URI Construction

  1. Always include scheme: Use trino:// prefix
  2. Specify host clearly: Use fully qualified domain names when possible
  3. Use default port: Omit port if using standard 8080
  4. Follow naming conventions: Use consistent catalog/schema/table naming

Error Handling

from urllib.parse import urlsplit
from airflow.providers.trino.assets.trino import sanitize_uri

def create_trino_dataset(uri_string: str):
    """Create a validated Trino dataset."""
    try:
        uri = urlsplit(uri_string)
        sanitized_uri = sanitize_uri(uri)
        return Dataset(sanitized_uri.geturl())
    except ValueError as e:
        raise ValueError(f"Invalid Trino URI '{uri_string}': {e}")

# Usage
try:
    dataset = create_trino_dataset("trino://cluster/analytics/sales/daily")
    print(f"Created dataset: {dataset.uri}")
except ValueError as e:
    print(f"Dataset creation failed: {e}")

Integration Testing

def test_trino_uri_sanitization():
    """Test URI sanitization behavior."""
    from urllib.parse import urlsplit
    from airflow.providers.trino.assets.trino import sanitize_uri
    
    # Test valid URI
    uri = urlsplit("trino://host/cat/sch/tbl")
    result = sanitize_uri(uri)
    assert result.port == 8080
    assert result.netloc == "host:8080"
    
    # Test error cases
    try:
        sanitize_uri(urlsplit("trino:///cat/sch/tbl"))
        assert False, "Should have raised ValueError"
    except ValueError:
        pass  # Expected

Install with Tessl CLI

npx tessl i tessl/pypi-apache-airflow-providers-trino

docs

asset-management.md

data-transfers.md

database-operations.md

index.md

tile.json