CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-apache-airflow-providers-trino

Provider package for integrating Apache Airflow with Trino database for queries, data transfers, and connection management

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

database-operations.mddocs/

Database Operations

Core database functionality for connecting to Trino clusters and executing SQL operations. The TrinoHook provides comprehensive database interaction capabilities with support for multiple authentication methods, connection management, and various query execution patterns.

Capabilities

Connection Management

Establishes and manages connections to Trino clusters with comprehensive authentication support and configuration options.

class TrinoHook(DbApiHook):
    """
    Interact with Trino through trino package.
    
    Attributes:
    - conn_name_attr: str = "trino_conn_id"
    - default_conn_name: str = "trino_default"
    - conn_type: str = "trino"
    - hook_name: str = "Trino"
    - strip_semicolon: bool = True
    - query_id: str = ""
    """
    
    def __init__(self, *args, **kwargs):
        """Initialize the TrinoHook."""
        pass
    
    def get_conn(self) -> Connection:
        """
        Return a connection object with proper authentication.
        
        Supports multiple authentication methods:
        - Basic authentication (username/password)
        - JWT authentication (token or file)
        - Certificate authentication (client certs)
        - Kerberos authentication
        
        Returns:
        Connection object configured with specified authentication
        """
        pass
    
    @classmethod
    def get_ui_field_behaviour(cls) -> dict[str, Any]:
        """
        Return custom field behaviour for Airflow UI.
        
        Returns:
        Dict with UI field configuration for connection form
        """
        pass

Query Execution

Execute SQL queries against Trino with various result formats and parameter binding support.

def get_records(
    self,
    sql: str,
    parameters=None
) -> list:
    """
    Execute query and return all records.
    
    Parameters:
    - sql: SQL query string
    - parameters: Query parameters for binding
    
    Returns:
    List of tuples containing query results
    """
    pass

def get_first(
    self,
    sql: str,
    parameters=None
) -> Any:
    """
    Execute query and return first record.
    
    Parameters:
    - sql: SQL query string  
    - parameters: Query parameters for binding
    
    Returns:
    First record as tuple or None if no results
    """
    pass

@deprecated(
    reason="Replaced by function `get_df`.",
    category=AirflowProviderDeprecationWarning,
    action="ignore",
)
def get_pandas_df(
    self,
    sql: str = "",
    parameters=None,
    **kwargs
) -> pandas.DataFrame:
    """
    Execute query and return pandas DataFrame.
    
    DEPRECATED: Use get_df() instead.
    
    Parameters:
    - sql: SQL query string
    - parameters: Query parameters for binding
    - **kwargs: Additional pandas read options
    
    Returns:
    pandas DataFrame with query results
    """
    pass

def get_df(
    self,
    sql: str = "",
    parameters=None,
    **kwargs
) -> pandas.DataFrame | polars.DataFrame:
    """
    Execute query and return DataFrame (pandas or polars based on configuration).
    
    Modern replacement for get_pandas_df() with support for both pandas and polars.
    
    Parameters:
    - sql: SQL query string
    - parameters: Query parameters for binding
    - **kwargs: Additional DataFrame read options
    
    Returns:
    pandas.DataFrame or polars.DataFrame with query results
    """
    pass

def _get_pandas_df(
    self,
    sql: str = "",
    parameters=None,
    **kwargs
) -> pandas.DataFrame:
    """
    Internal method to get pandas DataFrame.
    
    Parameters:
    - sql: SQL query string
    - parameters: Query parameters for binding
    - **kwargs: Additional pandas read options
    
    Returns:
    pandas DataFrame with query results
    """
    pass

def _get_polars_df(
    self,
    sql: str = "",
    parameters=None,
    **kwargs
) -> polars.DataFrame:
    """
    Internal method to get polars DataFrame.
    
    Parameters:
    - sql: SQL query string
    - parameters: Query parameters for binding
    - **kwargs: Additional polars read options
    
    Returns:
    polars DataFrame with query results
    """
    pass

Data Insertion

Insert data into Trino tables with batch processing and transaction management.

def insert_rows(
    self,
    table: str,
    rows: Iterable[tuple],
    target_fields: Iterable[str] | None = None,
    commit_every: int = 0,
    replace: bool = False,
    **kwargs
) -> None:
    """
    Insert rows into Trino table.
    
    Parameters:
    - table: Target table name
    - rows: Iterable of tuples containing row data
    - target_fields: Names of columns to fill in the table
    - commit_every: Maximum rows to insert in one transaction (0 = all rows)
    - replace: Whether to replace instead of insert
    - **kwargs: Additional keyword arguments
    """
    pass

Transaction Management

Manage database transactions and isolation levels for consistent data operations.

def get_isolation_level(self) -> Any:
    """
    Get current transaction isolation level.
    
    Returns:
    Current isolation level setting
    """
    pass

OpenLineage Integration

Support for data lineage tracking through OpenLineage integration.

def get_openlineage_database_info(self, connection):
    """
    Get database information for OpenLineage tracking.
    
    Parameters:
    - connection: Database connection object
    
    Returns:
    Database info dict for lineage tracking
    """
    pass

def get_openlineage_database_dialect(self, _):
    """
    Get database dialect for OpenLineage.
    
    Returns:
    Database dialect identifier
    """
    pass

def get_openlineage_default_schema(self):
    """
    Get default schema for OpenLineage tracking.
    
    Returns:
    Default schema name
    """
    pass

Connection URI

Generate connection URIs for external integrations and debugging.

def get_uri(self) -> str:
    """
    Get connection URI string.
    
    Returns:
    Connection URI for the current Trino connection
    """
    pass

@staticmethod
def _serialize_cell(cell: Any, conn: Connection | None = None) -> Any:
    """
    Serialize cell value for database insertion.
    
    Trino will adapt all execute() args internally, hence we return cell without any conversion.
    
    Parameters:
    - cell: The cell value to insert into the table
    - conn: The database connection (optional)
    
    Returns:
    The unmodified cell value
    """
    pass

Authentication Configuration

The hook supports multiple authentication methods configured through Airflow connection extras:

Basic Authentication

Set connection login and password fields:

# Connection configuration
login = "username"
password = "password"

JWT Authentication

Configure JWT token in connection extras:

# Via token string
extra = {"auth": "jwt", "jwt__token": "your-jwt-token"}

# Via token file
extra = {"auth": "jwt", "jwt__file": "/path/to/token.jwt"}

Certificate Authentication

Configure client certificates in connection extras:

extra = {
    "auth": "certs",
    "certs__client_cert_path": "/path/to/client.crt",
    "certs__client_key_path": "/path/to/client.key"
}

Kerberos Authentication

Configure Kerberos settings in connection extras:

extra = {
    "auth": "kerberos",
    "kerberos__config": "/path/to/krb5.conf",
    "kerberos__service_name": "trino",
    "kerberos__mutual_authentication": True,
    "kerberos__force_preemptive": False,
    "kerberos__hostname_override": "trino.example.com",
    "kerberos__principal": "user@REALM",
    "kerberos__delegate": False,
    "kerberos__ca_bundle": "/path/to/ca-bundle.crt"
}

Usage Examples

Basic Query Execution

from airflow.providers.trino.hooks.trino import TrinoHook

# Initialize hook
hook = TrinoHook(trino_conn_id='my_trino_conn')

# Execute simple query
sql = "SELECT count(*) FROM catalog.schema.table"
result = hook.get_records(sql)
print(f"Row count: {result[0][0]}")

# Get first result
first_row = hook.get_first("SELECT * FROM catalog.schema.table LIMIT 1")
print(f"First row: {first_row}")

Working with DataFrames

import pandas as pd
import polars as pl
from airflow.providers.trino.hooks.trino import TrinoHook

hook = TrinoHook(trino_conn_id='my_trino_conn')

# Modern approach - get DataFrame (pandas or polars based on configuration)
sql = "SELECT id, name, value FROM catalog.schema.table LIMIT 100"
df = hook.get_df(sql)

# Or explicitly get pandas DataFrame
df_pandas = hook._get_pandas_df(sql)

# Or explicitly get polars DataFrame  
df_polars = hook._get_polars_df(sql)

# Legacy approach (deprecated)
df_legacy = hook.get_pandas_df(sql)  # Shows deprecation warning

# Process DataFrame
print(f"DataFrame shape: {df.shape}")
print(df.describe())

Parameterized Queries

from airflow.providers.trino.hooks.trino import TrinoHook

hook = TrinoHook(trino_conn_id='my_trino_conn')

# Execute parameterized query
sql = "SELECT * FROM catalog.schema.table WHERE date >= ? AND status = ?"
params = ['2023-01-01', 'active']
results = hook.get_records(sql, parameters=params)

Data Insertion

from airflow.providers.trino.hooks.trino import TrinoHook

hook = TrinoHook(trino_conn_id='my_trino_conn')

# Prepare data rows
rows = [
    (1, 'Alice', 100.5),
    (2, 'Bob', 200.0),
    (3, 'Charlie', 150.75)
]

# Insert data
hook.insert_rows(
    table='catalog.schema.target_table',
    rows=rows,
    target_fields=['id', 'name', 'value'],
    commit_every=1000
)

Helper Functions

Client Information Generation

def generate_trino_client_info() -> str:
    """
    Return JSON string with DAG context information.
    
    Includes dag_id, task_id, logical_date/execution_date, try_number,
    dag_run_id, and dag_owner from Airflow context.
    
    Returns:
    JSON string with task execution context
    """
    pass

Exception Handling

class TrinoException(Exception):
    """
    Custom exception for Trino-related errors.
    
    Raised for Trino-specific issues and error conditions.
    """
    pass

Common error scenarios:

  • Connection authentication failures
  • Invalid SQL syntax
  • Missing tables or schemas
  • Permission errors
  • Network connectivity issues

Install with Tessl CLI

npx tessl i tessl/pypi-apache-airflow-providers-trino

docs

asset-management.md

data-transfers.md

database-operations.md

index.md

tile.json