Provider package for integrating Apache Airflow with Trino database for queries, data transfers, and connection management
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
URI validation and management for Trino data assets. Provides standardized handling of Trino URIs with proper format validation, default port configuration, and integration with Airflow's asset and dataset management systems.
Validates and sanitizes Trino URI format to ensure proper addressing of Trino resources.
def sanitize_uri(uri: SplitResult) -> SplitResult:
"""
Validate and sanitize Trino URI format.
Ensures the URI follows the proper trino:// format with required components:
- Host must be present
- Port defaults to 8080 if not specified
- Path must contain catalog, schema, and table names
Parameters:
- uri: SplitResult object from urllib.parse.urlsplit()
Returns:
SplitResult with validated and sanitized URI components
Raises:
ValueError: If URI format is invalid or missing required components
"""
passTrino URIs must follow the standardized format for proper asset identification:
trino://host:port/catalog/schema/tabletrino://from urllib.parse import urlsplit
from airflow.providers.trino.assets.trino import sanitize_uri
# Complete URI with port
uri_string = "trino://trino-cluster.example.com:8080/analytics/sales/daily_transactions"
uri = urlsplit(uri_string)
sanitized_uri = sanitize_uri(uri)
print(f"Sanitized URI: {sanitized_uri.geturl()}")from urllib.parse import urlsplit
from airflow.providers.trino.assets.trino import sanitize_uri
# URI without port - will default to 8080
uri_string = "trino://trino-cluster.example.com/warehouse/customers/profiles"
uri = urlsplit(uri_string)
sanitized_uri = sanitize_uri(uri)
print(f"URI with default port: {sanitized_uri.geturl()}")
# Output: trino://trino-cluster.example.com:8080/warehouse/customers/profilesfrom urllib.parse import urlsplit
from airflow.providers.trino.assets.trino import sanitize_uri
try:
# Invalid URI - missing host
invalid_uri = urlsplit("trino:///catalog/schema/table")
sanitize_uri(invalid_uri)
except ValueError as e:
print(f"Validation error: {e}")
# Output: URI format trino:// must contain a host
try:
# Invalid URI - incomplete path
invalid_uri = urlsplit("trino://host:8080/catalog/schema")
sanitize_uri(invalid_uri)
except ValueError as e:
print(f"Validation error: {e}")
# Output: URI format trino:// must contain catalog, schema, and table namesThe sanitized URIs integrate with Airflow's asset and dataset management for data lineage tracking:
from airflow import DAG, Dataset
from airflow.providers.trino.assets.trino import sanitize_uri
from urllib.parse import urlsplit
# Define Trino dataset
trino_uri = "trino://production-cluster:8080/analytics/sales/daily_revenue"
parsed_uri = urlsplit(trino_uri)
sanitized_uri = sanitize_uri(parsed_uri)
# Create Airflow dataset
sales_dataset = Dataset(sanitized_uri.geturl())from airflow import DAG, Dataset
from airflow.operators.python import PythonOperator
from airflow.providers.trino.hooks.trino import TrinoHook
from datetime import datetime
# Define datasets
input_dataset = Dataset("trino://cluster:8080/raw/transactions/daily")
output_dataset = Dataset("trino://cluster:8080/analytics/sales/summary")
def process_sales_data():
hook = TrinoHook(trino_conn_id='trino_default')
# Transform data
sql = """
INSERT INTO analytics.sales.summary
SELECT
date_trunc('day', transaction_time) as date,
sum(amount) as total_revenue,
count(*) as transaction_count
FROM raw.transactions.daily
WHERE transaction_time >= current_date
GROUP BY 1
"""
hook.run(sql)
dag = DAG(
'sales_processing',
start_date=datetime(2023, 1, 1),
schedule=[input_dataset], # Triggered by input dataset updates
catchup=False
)
process_task = PythonOperator(
task_id='process_sales',
python_callable=process_sales_data,
outlets=[output_dataset], # Produces output dataset
dag=dag
)The asset URI handling is automatically registered through the provider configuration:
# From get_provider_info()
{
"asset-uris": [
{"schemes": ["trino"], "handler": "airflow.providers.trino.assets.trino.sanitize_uri"}
],
"dataset-uris": [
{"schemes": ["trino"], "handler": "airflow.providers.trino.assets.trino.sanitize_uri"}
]
}This registration enables Airflow to automatically use the sanitization function for any trino:// URIs in asset and dataset definitions.
The sanitization function handles several error conditions:
# This will raise ValueError
uri = urlsplit("trino:///catalog/schema/table")
sanitize_uri(uri) # ValueError: URI format trino:// must contain a host# This will raise ValueError - missing table name
uri = urlsplit("trino://host:8080/catalog/schema")
sanitize_uri(uri) # ValueError: URI format trino:// must contain catalog, schema, and table names
# This will raise ValueError - missing schema and table
uri = urlsplit("trino://host:8080/catalog")
sanitize_uri(uri) # ValueError: URI format trino:// must contain catalog, schema, and table names# All of these are valid and will be properly sanitized:
# With explicit port
"trino://cluster.example.com:8080/catalog/schema/table"
# Without port (defaults to 8080)
"trino://cluster.example.com/catalog/schema/table"
# With complex names
"trino://trino-prod.company.com:8443/data_warehouse/customer_analytics/daily_active_users"
# With numeric components
"trino://trino01.internal:9000/db2023/schema_v2/table_001"trino:// prefixfrom urllib.parse import urlsplit
from airflow.providers.trino.assets.trino import sanitize_uri
def create_trino_dataset(uri_string: str):
"""Create a validated Trino dataset."""
try:
uri = urlsplit(uri_string)
sanitized_uri = sanitize_uri(uri)
return Dataset(sanitized_uri.geturl())
except ValueError as e:
raise ValueError(f"Invalid Trino URI '{uri_string}': {e}")
# Usage
try:
dataset = create_trino_dataset("trino://cluster/analytics/sales/daily")
print(f"Created dataset: {dataset.uri}")
except ValueError as e:
print(f"Dataset creation failed: {e}")def test_trino_uri_sanitization():
"""Test URI sanitization behavior."""
from urllib.parse import urlsplit
from airflow.providers.trino.assets.trino import sanitize_uri
# Test valid URI
uri = urlsplit("trino://host/cat/sch/tbl")
result = sanitize_uri(uri)
assert result.port == 8080
assert result.netloc == "host:8080"
# Test error cases
try:
sanitize_uri(urlsplit("trino:///cat/sch/tbl"))
assert False, "Should have raised ValueError"
except ValueError:
pass # ExpectedInstall with Tessl CLI
npx tessl i tessl/pypi-apache-airflow-providers-trino