or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

asset-management.mddata-transfers.mddatabase-operations.mdindex.md

index.mddocs/

0

# Apache Airflow Providers Trino

1

2

A provider package that integrates Apache Airflow with Trino (formerly PrestoSQL) for database operations, data transfers, and connection management. This provider enables users to execute SQL queries against Trino clusters, transfer data from external sources like Google Cloud Storage to Trino tables, and manage Trino connections with various authentication methods.

3

4

## Package Information

5

6

- **Package Name**: apache-airflow-providers-trino

7

- **Language**: Python

8

- **Installation**: `pip install apache-airflow-providers-trino`

9

10

## Core Imports

11

12

```python

13

from airflow.providers.trino.hooks.trino import TrinoHook

14

from airflow.providers.trino.transfers.gcs_to_trino import GCSToTrinoOperator

15

```

16

17

For provider metadata:

18

19

```python

20

from airflow.providers.trino.get_provider_info import get_provider_info

21

```

22

23

## Basic Usage

24

25

```python

26

from airflow.providers.trino.hooks.trino import TrinoHook

27

from airflow import DAG

28

from airflow.operators.python import PythonOperator

29

from datetime import datetime

30

31

def query_trino():

32

# Create Trino hook with connection

33

hook = TrinoHook(trino_conn_id='trino_default')

34

35

# Execute a simple query

36

sql = "SELECT count(*) FROM catalog.schema.table"

37

result = hook.get_records(sql)

38

print(f"Query result: {result}")

39

40

# Get pandas DataFrame

41

df = hook.get_pandas_df("SELECT * FROM catalog.schema.table LIMIT 10")

42

print(df.head())

43

44

# Define DAG

45

dag = DAG(

46

'trino_example',

47

start_date=datetime(2023, 1, 1),

48

schedule_interval=None,

49

catchup=False

50

)

51

52

# Add task

53

query_task = PythonOperator(

54

task_id='query_trino',

55

python_callable=query_trino,

56

dag=dag

57

)

58

```

59

60

## Architecture

61

62

The provider is built around three core components:

63

64

- **Hooks**: Database connection and query execution through TrinoHook, supporting multiple authentication methods (Basic, JWT, Certificates, Kerberos)

65

- **Transfers**: Data movement operators like GCSToTrinoOperator for loading external data into Trino tables

66

- **Assets**: URI handling and validation for Trino resource references with proper catalog/schema/table addressing

67

68

This design enables comprehensive Trino integration within Airflow workflows, from simple query execution to complex data pipeline orchestration with external data sources.

69

70

## Capabilities

71

72

### Database Operations

73

74

Core database functionality for connecting to Trino clusters and executing SQL operations. Supports query execution, connection management, and multiple authentication methods including Basic, JWT, Certificates, and Kerberos.

75

76

```python { .api }

77

class TrinoHook(DbApiHook):

78

def get_conn(self) -> Connection: ...

79

def get_records(self, sql: str, parameters=None) -> list: ...

80

def get_first(self, sql: str, parameters=None) -> Any: ...

81

def get_pandas_df(self, sql: str = "", parameters=None, **kwargs) -> pandas.DataFrame: ...

82

def insert_rows(self, table: str, rows: Iterable[tuple], target_fields: Iterable[str] | None = None, commit_every: int = 0, replace: bool = False, **kwargs) -> None: ...

83

```

84

85

[Database Operations](./database-operations.md)

86

87

### Data Transfers

88

89

Transfer operators for moving data from external sources into Trino tables. Currently supports Google Cloud Storage to Trino transfers with CSV file processing and flexible schema mapping.

90

91

```python { .api }

92

class GCSToTrinoOperator(BaseOperator):

93

def __init__(

94

self,

95

*,

96

source_bucket: str,

97

source_object: str,

98

trino_table: str,

99

trino_conn_id: str = "trino_default",

100

gcp_conn_id: str = "google_cloud_default",

101

schema_fields: Iterable[str] | None = None,

102

schema_object: str | None = None,

103

impersonation_chain: str | Sequence[str] | None = None,

104

**kwargs

105

): ...

106

```

107

108

[Data Transfers](./data-transfers.md)

109

110

### Asset Management

111

112

URI validation and management for Trino data assets. Provides standardized handling of Trino URIs with proper format validation and default port configuration.

113

114

```python { .api }

115

def sanitize_uri(uri: SplitResult) -> SplitResult: ...

116

```

117

118

[Asset Management](./asset-management.md)

119

120

## Connection Configuration

121

122

Trino connections support various authentication methods configured through Airflow connection extras:

123

124

- **Basic Authentication**: Username/password via connection login and password fields

125

- **JWT Authentication**: JSON Web Token via `jwt__token` or `jwt__file` extras

126

- **Certificate Authentication**: Client certificates via `certs__client_cert_path` and `certs__client_key_path` extras

127

- **Kerberos Authentication**: Kerberos configuration via various `kerberos__*` extras

128

129

Additional configuration options include `session_properties`, `client_tags`, and `timezone` settings.

130

131

## Types

132

133

```python { .api }

134

# Type imports

135

from collections.abc import Iterable, Sequence

136

from urllib.parse import SplitResult

137

import pandas

138

139

class TrinoException(Exception):

140

"""Custom exception for Trino-related errors."""

141

pass

142

143

# Connection and authentication types from trino package

144

Connection = trino.dbapi.Connection

145

146

# Authentication classes from trino.auth

147

BasicAuthentication = trino.auth.BasicAuthentication

148

JWTAuthentication = trino.auth.JWTAuthentication

149

CertificateAuthentication = trino.auth.CertificateAuthentication

150

KerberosAuthentication = trino.auth.KerberosAuthentication

151

```