or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

asset-management.mddata-transfers.mddatabase-operations.mdindex.md
tile.json

tessl/pypi-apache-airflow-providers-trino

Provider package for integrating Apache Airflow with Trino database for queries, data transfers, and connection management

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
pypipkg:pypi/apache-airflow-providers-trino@6.3.x

To install, run

npx @tessl/cli install tessl/pypi-apache-airflow-providers-trino@6.3.0

index.mddocs/

Apache Airflow Providers Trino

A provider package that integrates Apache Airflow with Trino (formerly PrestoSQL) for database operations, data transfers, and connection management. This provider enables users to execute SQL queries against Trino clusters, transfer data from external sources like Google Cloud Storage to Trino tables, and manage Trino connections with various authentication methods.

Package Information

  • Package Name: apache-airflow-providers-trino
  • Language: Python
  • Installation: pip install apache-airflow-providers-trino

Core Imports

from airflow.providers.trino.hooks.trino import TrinoHook
from airflow.providers.trino.transfers.gcs_to_trino import GCSToTrinoOperator

For provider metadata:

from airflow.providers.trino.get_provider_info import get_provider_info

Basic Usage

from airflow.providers.trino.hooks.trino import TrinoHook
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime

def query_trino():
    # Create Trino hook with connection
    hook = TrinoHook(trino_conn_id='trino_default')
    
    # Execute a simple query
    sql = "SELECT count(*) FROM catalog.schema.table"
    result = hook.get_records(sql)
    print(f"Query result: {result}")
    
    # Get pandas DataFrame
    df = hook.get_pandas_df("SELECT * FROM catalog.schema.table LIMIT 10")
    print(df.head())

# Define DAG
dag = DAG(
    'trino_example',
    start_date=datetime(2023, 1, 1),
    schedule_interval=None,
    catchup=False
)

# Add task
query_task = PythonOperator(
    task_id='query_trino',
    python_callable=query_trino,
    dag=dag
)

Architecture

The provider is built around three core components:

  • Hooks: Database connection and query execution through TrinoHook, supporting multiple authentication methods (Basic, JWT, Certificates, Kerberos)
  • Transfers: Data movement operators like GCSToTrinoOperator for loading external data into Trino tables
  • Assets: URI handling and validation for Trino resource references with proper catalog/schema/table addressing

This design enables comprehensive Trino integration within Airflow workflows, from simple query execution to complex data pipeline orchestration with external data sources.

Capabilities

Database Operations

Core database functionality for connecting to Trino clusters and executing SQL operations. Supports query execution, connection management, and multiple authentication methods including Basic, JWT, Certificates, and Kerberos.

class TrinoHook(DbApiHook):
    def get_conn(self) -> Connection: ...
    def get_records(self, sql: str, parameters=None) -> list: ...
    def get_first(self, sql: str, parameters=None) -> Any: ...
    def get_pandas_df(self, sql: str = "", parameters=None, **kwargs) -> pandas.DataFrame: ...
    def insert_rows(self, table: str, rows: Iterable[tuple], target_fields: Iterable[str] | None = None, commit_every: int = 0, replace: bool = False, **kwargs) -> None: ...

Database Operations

Data Transfers

Transfer operators for moving data from external sources into Trino tables. Currently supports Google Cloud Storage to Trino transfers with CSV file processing and flexible schema mapping.

class GCSToTrinoOperator(BaseOperator):
    def __init__(
        self,
        *,
        source_bucket: str,
        source_object: str,
        trino_table: str,
        trino_conn_id: str = "trino_default",
        gcp_conn_id: str = "google_cloud_default",
        schema_fields: Iterable[str] | None = None,
        schema_object: str | None = None,
        impersonation_chain: str | Sequence[str] | None = None,
        **kwargs
    ): ...

Data Transfers

Asset Management

URI validation and management for Trino data assets. Provides standardized handling of Trino URIs with proper format validation and default port configuration.

def sanitize_uri(uri: SplitResult) -> SplitResult: ...

Asset Management

Connection Configuration

Trino connections support various authentication methods configured through Airflow connection extras:

  • Basic Authentication: Username/password via connection login and password fields
  • JWT Authentication: JSON Web Token via jwt__token or jwt__file extras
  • Certificate Authentication: Client certificates via certs__client_cert_path and certs__client_key_path extras
  • Kerberos Authentication: Kerberos configuration via various kerberos__* extras

Additional configuration options include session_properties, client_tags, and timezone settings.

Types

# Type imports
from collections.abc import Iterable, Sequence
from urllib.parse import SplitResult
import pandas

class TrinoException(Exception):
    """Custom exception for Trino-related errors."""
    pass

# Connection and authentication types from trino package
Connection = trino.dbapi.Connection

# Authentication classes from trino.auth
BasicAuthentication = trino.auth.BasicAuthentication
JWTAuthentication = trino.auth.JWTAuthentication
CertificateAuthentication = trino.auth.CertificateAuthentication
KerberosAuthentication = trino.auth.KerberosAuthentication