0
# Apache Airflow Providers Trino
1
2
A provider package that integrates Apache Airflow with Trino (formerly PrestoSQL) for database operations, data transfers, and connection management. This provider enables users to execute SQL queries against Trino clusters, transfer data from external sources like Google Cloud Storage to Trino tables, and manage Trino connections with various authentication methods.
3
4
## Package Information
5
6
- **Package Name**: apache-airflow-providers-trino
7
- **Language**: Python
8
- **Installation**: `pip install apache-airflow-providers-trino`
9
10
## Core Imports
11
12
```python
13
from airflow.providers.trino.hooks.trino import TrinoHook
14
from airflow.providers.trino.transfers.gcs_to_trino import GCSToTrinoOperator
15
```
16
17
For provider metadata:
18
19
```python
20
from airflow.providers.trino.get_provider_info import get_provider_info
21
```
22
23
## Basic Usage
24
25
```python
26
from airflow.providers.trino.hooks.trino import TrinoHook
27
from airflow import DAG
28
from airflow.operators.python import PythonOperator
29
from datetime import datetime
30
31
def query_trino():
32
# Create Trino hook with connection
33
hook = TrinoHook(trino_conn_id='trino_default')
34
35
# Execute a simple query
36
sql = "SELECT count(*) FROM catalog.schema.table"
37
result = hook.get_records(sql)
38
print(f"Query result: {result}")
39
40
# Get pandas DataFrame
41
df = hook.get_pandas_df("SELECT * FROM catalog.schema.table LIMIT 10")
42
print(df.head())
43
44
# Define DAG
45
dag = DAG(
46
'trino_example',
47
start_date=datetime(2023, 1, 1),
48
schedule_interval=None,
49
catchup=False
50
)
51
52
# Add task
53
query_task = PythonOperator(
54
task_id='query_trino',
55
python_callable=query_trino,
56
dag=dag
57
)
58
```
59
60
## Architecture
61
62
The provider is built around three core components:
63
64
- **Hooks**: Database connection and query execution through TrinoHook, supporting multiple authentication methods (Basic, JWT, Certificates, Kerberos)
65
- **Transfers**: Data movement operators like GCSToTrinoOperator for loading external data into Trino tables
66
- **Assets**: URI handling and validation for Trino resource references with proper catalog/schema/table addressing
67
68
This design enables comprehensive Trino integration within Airflow workflows, from simple query execution to complex data pipeline orchestration with external data sources.
69
70
## Capabilities
71
72
### Database Operations
73
74
Core database functionality for connecting to Trino clusters and executing SQL operations. Supports query execution, connection management, and multiple authentication methods including Basic, JWT, Certificates, and Kerberos.
75
76
```python { .api }
77
class TrinoHook(DbApiHook):
78
def get_conn(self) -> Connection: ...
79
def get_records(self, sql: str, parameters=None) -> list: ...
80
def get_first(self, sql: str, parameters=None) -> Any: ...
81
def get_pandas_df(self, sql: str = "", parameters=None, **kwargs) -> pandas.DataFrame: ...
82
def insert_rows(self, table: str, rows: Iterable[tuple], target_fields: Iterable[str] | None = None, commit_every: int = 0, replace: bool = False, **kwargs) -> None: ...
83
```
84
85
[Database Operations](./database-operations.md)
86
87
### Data Transfers
88
89
Transfer operators for moving data from external sources into Trino tables. Currently supports Google Cloud Storage to Trino transfers with CSV file processing and flexible schema mapping.
90
91
```python { .api }
92
class GCSToTrinoOperator(BaseOperator):
93
def __init__(
94
self,
95
*,
96
source_bucket: str,
97
source_object: str,
98
trino_table: str,
99
trino_conn_id: str = "trino_default",
100
gcp_conn_id: str = "google_cloud_default",
101
schema_fields: Iterable[str] | None = None,
102
schema_object: str | None = None,
103
impersonation_chain: str | Sequence[str] | None = None,
104
**kwargs
105
): ...
106
```
107
108
[Data Transfers](./data-transfers.md)
109
110
### Asset Management
111
112
URI validation and management for Trino data assets. Provides standardized handling of Trino URIs with proper format validation and default port configuration.
113
114
```python { .api }
115
def sanitize_uri(uri: SplitResult) -> SplitResult: ...
116
```
117
118
[Asset Management](./asset-management.md)
119
120
## Connection Configuration
121
122
Trino connections support various authentication methods configured through Airflow connection extras:
123
124
- **Basic Authentication**: Username/password via connection login and password fields
125
- **JWT Authentication**: JSON Web Token via `jwt__token` or `jwt__file` extras
126
- **Certificate Authentication**: Client certificates via `certs__client_cert_path` and `certs__client_key_path` extras
127
- **Kerberos Authentication**: Kerberos configuration via various `kerberos__*` extras
128
129
Additional configuration options include `session_properties`, `client_tags`, and `timezone` settings.
130
131
## Types
132
133
```python { .api }
134
# Type imports
135
from collections.abc import Iterable, Sequence
136
from urllib.parse import SplitResult
137
import pandas
138
139
class TrinoException(Exception):
140
"""Custom exception for Trino-related errors."""
141
pass
142
143
# Connection and authentication types from trino package
144
Connection = trino.dbapi.Connection
145
146
# Authentication classes from trino.auth
147
BasicAuthentication = trino.auth.BasicAuthentication
148
JWTAuthentication = trino.auth.JWTAuthentication
149
CertificateAuthentication = trino.auth.CertificateAuthentication
150
KerberosAuthentication = trino.auth.KerberosAuthentication
151
```