Provider package for integrating Apache Airflow with Trino database for queries, data transfers, and connection management
npx @tessl/cli install tessl/pypi-apache-airflow-providers-trino@6.3.0A provider package that integrates Apache Airflow with Trino (formerly PrestoSQL) for database operations, data transfers, and connection management. This provider enables users to execute SQL queries against Trino clusters, transfer data from external sources like Google Cloud Storage to Trino tables, and manage Trino connections with various authentication methods.
pip install apache-airflow-providers-trinofrom airflow.providers.trino.hooks.trino import TrinoHook
from airflow.providers.trino.transfers.gcs_to_trino import GCSToTrinoOperatorFor provider metadata:
from airflow.providers.trino.get_provider_info import get_provider_infofrom airflow.providers.trino.hooks.trino import TrinoHook
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
def query_trino():
# Create Trino hook with connection
hook = TrinoHook(trino_conn_id='trino_default')
# Execute a simple query
sql = "SELECT count(*) FROM catalog.schema.table"
result = hook.get_records(sql)
print(f"Query result: {result}")
# Get pandas DataFrame
df = hook.get_pandas_df("SELECT * FROM catalog.schema.table LIMIT 10")
print(df.head())
# Define DAG
dag = DAG(
'trino_example',
start_date=datetime(2023, 1, 1),
schedule_interval=None,
catchup=False
)
# Add task
query_task = PythonOperator(
task_id='query_trino',
python_callable=query_trino,
dag=dag
)The provider is built around three core components:
This design enables comprehensive Trino integration within Airflow workflows, from simple query execution to complex data pipeline orchestration with external data sources.
Core database functionality for connecting to Trino clusters and executing SQL operations. Supports query execution, connection management, and multiple authentication methods including Basic, JWT, Certificates, and Kerberos.
class TrinoHook(DbApiHook):
def get_conn(self) -> Connection: ...
def get_records(self, sql: str, parameters=None) -> list: ...
def get_first(self, sql: str, parameters=None) -> Any: ...
def get_pandas_df(self, sql: str = "", parameters=None, **kwargs) -> pandas.DataFrame: ...
def insert_rows(self, table: str, rows: Iterable[tuple], target_fields: Iterable[str] | None = None, commit_every: int = 0, replace: bool = False, **kwargs) -> None: ...Transfer operators for moving data from external sources into Trino tables. Currently supports Google Cloud Storage to Trino transfers with CSV file processing and flexible schema mapping.
class GCSToTrinoOperator(BaseOperator):
def __init__(
self,
*,
source_bucket: str,
source_object: str,
trino_table: str,
trino_conn_id: str = "trino_default",
gcp_conn_id: str = "google_cloud_default",
schema_fields: Iterable[str] | None = None,
schema_object: str | None = None,
impersonation_chain: str | Sequence[str] | None = None,
**kwargs
): ...URI validation and management for Trino data assets. Provides standardized handling of Trino URIs with proper format validation and default port configuration.
def sanitize_uri(uri: SplitResult) -> SplitResult: ...Trino connections support various authentication methods configured through Airflow connection extras:
jwt__token or jwt__file extrascerts__client_cert_path and certs__client_key_path extraskerberos__* extrasAdditional configuration options include session_properties, client_tags, and timezone settings.
# Type imports
from collections.abc import Iterable, Sequence
from urllib.parse import SplitResult
import pandas
class TrinoException(Exception):
"""Custom exception for Trino-related errors."""
pass
# Connection and authentication types from trino package
Connection = trino.dbapi.Connection
# Authentication classes from trino.auth
BasicAuthentication = trino.auth.BasicAuthentication
JWTAuthentication = trino.auth.JWTAuthentication
CertificateAuthentication = trino.auth.CertificateAuthentication
KerberosAuthentication = trino.auth.KerberosAuthentication