Provider package for integrating Apache Airflow with Trino database for queries, data transfers, and connection management
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Core database functionality for connecting to Trino clusters and executing SQL operations. The TrinoHook provides comprehensive database interaction capabilities with support for multiple authentication methods, connection management, and various query execution patterns.
Establishes and manages connections to Trino clusters with comprehensive authentication support and configuration options.
class TrinoHook(DbApiHook):
"""
Interact with Trino through trino package.
Attributes:
- conn_name_attr: str = "trino_conn_id"
- default_conn_name: str = "trino_default"
- conn_type: str = "trino"
- hook_name: str = "Trino"
- strip_semicolon: bool = True
- query_id: str = ""
"""
def __init__(self, *args, **kwargs):
"""Initialize the TrinoHook."""
pass
def get_conn(self) -> Connection:
"""
Return a connection object with proper authentication.
Supports multiple authentication methods:
- Basic authentication (username/password)
- JWT authentication (token or file)
- Certificate authentication (client certs)
- Kerberos authentication
Returns:
Connection object configured with specified authentication
"""
pass
@classmethod
def get_ui_field_behaviour(cls) -> dict[str, Any]:
"""
Return custom field behaviour for Airflow UI.
Returns:
Dict with UI field configuration for connection form
"""
passExecute SQL queries against Trino with various result formats and parameter binding support.
def get_records(
self,
sql: str,
parameters=None
) -> list:
"""
Execute query and return all records.
Parameters:
- sql: SQL query string
- parameters: Query parameters for binding
Returns:
List of tuples containing query results
"""
pass
def get_first(
self,
sql: str,
parameters=None
) -> Any:
"""
Execute query and return first record.
Parameters:
- sql: SQL query string
- parameters: Query parameters for binding
Returns:
First record as tuple or None if no results
"""
pass
@deprecated(
reason="Replaced by function `get_df`.",
category=AirflowProviderDeprecationWarning,
action="ignore",
)
def get_pandas_df(
self,
sql: str = "",
parameters=None,
**kwargs
) -> pandas.DataFrame:
"""
Execute query and return pandas DataFrame.
DEPRECATED: Use get_df() instead.
Parameters:
- sql: SQL query string
- parameters: Query parameters for binding
- **kwargs: Additional pandas read options
Returns:
pandas DataFrame with query results
"""
pass
def get_df(
self,
sql: str = "",
parameters=None,
**kwargs
) -> pandas.DataFrame | polars.DataFrame:
"""
Execute query and return DataFrame (pandas or polars based on configuration).
Modern replacement for get_pandas_df() with support for both pandas and polars.
Parameters:
- sql: SQL query string
- parameters: Query parameters for binding
- **kwargs: Additional DataFrame read options
Returns:
pandas.DataFrame or polars.DataFrame with query results
"""
pass
def _get_pandas_df(
self,
sql: str = "",
parameters=None,
**kwargs
) -> pandas.DataFrame:
"""
Internal method to get pandas DataFrame.
Parameters:
- sql: SQL query string
- parameters: Query parameters for binding
- **kwargs: Additional pandas read options
Returns:
pandas DataFrame with query results
"""
pass
def _get_polars_df(
self,
sql: str = "",
parameters=None,
**kwargs
) -> polars.DataFrame:
"""
Internal method to get polars DataFrame.
Parameters:
- sql: SQL query string
- parameters: Query parameters for binding
- **kwargs: Additional polars read options
Returns:
polars DataFrame with query results
"""
passInsert data into Trino tables with batch processing and transaction management.
def insert_rows(
self,
table: str,
rows: Iterable[tuple],
target_fields: Iterable[str] | None = None,
commit_every: int = 0,
replace: bool = False,
**kwargs
) -> None:
"""
Insert rows into Trino table.
Parameters:
- table: Target table name
- rows: Iterable of tuples containing row data
- target_fields: Names of columns to fill in the table
- commit_every: Maximum rows to insert in one transaction (0 = all rows)
- replace: Whether to replace instead of insert
- **kwargs: Additional keyword arguments
"""
passManage database transactions and isolation levels for consistent data operations.
def get_isolation_level(self) -> Any:
"""
Get current transaction isolation level.
Returns:
Current isolation level setting
"""
passSupport for data lineage tracking through OpenLineage integration.
def get_openlineage_database_info(self, connection):
"""
Get database information for OpenLineage tracking.
Parameters:
- connection: Database connection object
Returns:
Database info dict for lineage tracking
"""
pass
def get_openlineage_database_dialect(self, _):
"""
Get database dialect for OpenLineage.
Returns:
Database dialect identifier
"""
pass
def get_openlineage_default_schema(self):
"""
Get default schema for OpenLineage tracking.
Returns:
Default schema name
"""
passGenerate connection URIs for external integrations and debugging.
def get_uri(self) -> str:
"""
Get connection URI string.
Returns:
Connection URI for the current Trino connection
"""
pass
@staticmethod
def _serialize_cell(cell: Any, conn: Connection | None = None) -> Any:
"""
Serialize cell value for database insertion.
Trino will adapt all execute() args internally, hence we return cell without any conversion.
Parameters:
- cell: The cell value to insert into the table
- conn: The database connection (optional)
Returns:
The unmodified cell value
"""
passThe hook supports multiple authentication methods configured through Airflow connection extras:
Set connection login and password fields:
# Connection configuration
login = "username"
password = "password"Configure JWT token in connection extras:
# Via token string
extra = {"auth": "jwt", "jwt__token": "your-jwt-token"}
# Via token file
extra = {"auth": "jwt", "jwt__file": "/path/to/token.jwt"}Configure client certificates in connection extras:
extra = {
"auth": "certs",
"certs__client_cert_path": "/path/to/client.crt",
"certs__client_key_path": "/path/to/client.key"
}Configure Kerberos settings in connection extras:
extra = {
"auth": "kerberos",
"kerberos__config": "/path/to/krb5.conf",
"kerberos__service_name": "trino",
"kerberos__mutual_authentication": True,
"kerberos__force_preemptive": False,
"kerberos__hostname_override": "trino.example.com",
"kerberos__principal": "user@REALM",
"kerberos__delegate": False,
"kerberos__ca_bundle": "/path/to/ca-bundle.crt"
}from airflow.providers.trino.hooks.trino import TrinoHook
# Initialize hook
hook = TrinoHook(trino_conn_id='my_trino_conn')
# Execute simple query
sql = "SELECT count(*) FROM catalog.schema.table"
result = hook.get_records(sql)
print(f"Row count: {result[0][0]}")
# Get first result
first_row = hook.get_first("SELECT * FROM catalog.schema.table LIMIT 1")
print(f"First row: {first_row}")import pandas as pd
import polars as pl
from airflow.providers.trino.hooks.trino import TrinoHook
hook = TrinoHook(trino_conn_id='my_trino_conn')
# Modern approach - get DataFrame (pandas or polars based on configuration)
sql = "SELECT id, name, value FROM catalog.schema.table LIMIT 100"
df = hook.get_df(sql)
# Or explicitly get pandas DataFrame
df_pandas = hook._get_pandas_df(sql)
# Or explicitly get polars DataFrame
df_polars = hook._get_polars_df(sql)
# Legacy approach (deprecated)
df_legacy = hook.get_pandas_df(sql) # Shows deprecation warning
# Process DataFrame
print(f"DataFrame shape: {df.shape}")
print(df.describe())from airflow.providers.trino.hooks.trino import TrinoHook
hook = TrinoHook(trino_conn_id='my_trino_conn')
# Execute parameterized query
sql = "SELECT * FROM catalog.schema.table WHERE date >= ? AND status = ?"
params = ['2023-01-01', 'active']
results = hook.get_records(sql, parameters=params)from airflow.providers.trino.hooks.trino import TrinoHook
hook = TrinoHook(trino_conn_id='my_trino_conn')
# Prepare data rows
rows = [
(1, 'Alice', 100.5),
(2, 'Bob', 200.0),
(3, 'Charlie', 150.75)
]
# Insert data
hook.insert_rows(
table='catalog.schema.target_table',
rows=rows,
target_fields=['id', 'name', 'value'],
commit_every=1000
)def generate_trino_client_info() -> str:
"""
Return JSON string with DAG context information.
Includes dag_id, task_id, logical_date/execution_date, try_number,
dag_run_id, and dag_owner from Airflow context.
Returns:
JSON string with task execution context
"""
passclass TrinoException(Exception):
"""
Custom exception for Trino-related errors.
Raised for Trino-specific issues and error conditions.
"""
passCommon error scenarios:
Install with Tessl CLI
npx tessl i tessl/pypi-apache-airflow-providers-trino