0
# Database Operations
1
2
Core database functionality for connecting to Trino clusters and executing SQL operations. The TrinoHook provides comprehensive database interaction capabilities with support for multiple authentication methods, connection management, and various query execution patterns.
3
4
## Capabilities
5
6
### Connection Management
7
8
Establishes and manages connections to Trino clusters with comprehensive authentication support and configuration options.
9
10
```python { .api }
11
class TrinoHook(DbApiHook):
12
"""
13
Interact with Trino through trino package.
14
15
Attributes:
16
- conn_name_attr: str = "trino_conn_id"
17
- default_conn_name: str = "trino_default"
18
- conn_type: str = "trino"
19
- hook_name: str = "Trino"
20
- strip_semicolon: bool = True
21
- query_id: str = ""
22
"""
23
24
def __init__(self, *args, **kwargs):
25
"""Initialize the TrinoHook."""
26
pass
27
28
def get_conn(self) -> Connection:
29
"""
30
Return a connection object with proper authentication.
31
32
Supports multiple authentication methods:
33
- Basic authentication (username/password)
34
- JWT authentication (token or file)
35
- Certificate authentication (client certs)
36
- Kerberos authentication
37
38
Returns:
39
Connection object configured with specified authentication
40
"""
41
pass
42
43
@classmethod
44
def get_ui_field_behaviour(cls) -> dict[str, Any]:
45
"""
46
Return custom field behaviour for Airflow UI.
47
48
Returns:
49
Dict with UI field configuration for connection form
50
"""
51
pass
52
```
53
54
### Query Execution
55
56
Execute SQL queries against Trino with various result formats and parameter binding support.
57
58
```python { .api }
59
def get_records(
60
self,
61
sql: str,
62
parameters=None
63
) -> list:
64
"""
65
Execute query and return all records.
66
67
Parameters:
68
- sql: SQL query string
69
- parameters: Query parameters for binding
70
71
Returns:
72
List of tuples containing query results
73
"""
74
pass
75
76
def get_first(
77
self,
78
sql: str,
79
parameters=None
80
) -> Any:
81
"""
82
Execute query and return first record.
83
84
Parameters:
85
- sql: SQL query string
86
- parameters: Query parameters for binding
87
88
Returns:
89
First record as tuple or None if no results
90
"""
91
pass
92
93
@deprecated(
94
reason="Replaced by function `get_df`.",
95
category=AirflowProviderDeprecationWarning,
96
action="ignore",
97
)
98
def get_pandas_df(
99
self,
100
sql: str = "",
101
parameters=None,
102
**kwargs
103
) -> pandas.DataFrame:
104
"""
105
Execute query and return pandas DataFrame.
106
107
DEPRECATED: Use get_df() instead.
108
109
Parameters:
110
- sql: SQL query string
111
- parameters: Query parameters for binding
112
- **kwargs: Additional pandas read options
113
114
Returns:
115
pandas DataFrame with query results
116
"""
117
pass
118
119
def get_df(
120
self,
121
sql: str = "",
122
parameters=None,
123
**kwargs
124
) -> pandas.DataFrame | polars.DataFrame:
125
"""
126
Execute query and return DataFrame (pandas or polars based on configuration).
127
128
Modern replacement for get_pandas_df() with support for both pandas and polars.
129
130
Parameters:
131
- sql: SQL query string
132
- parameters: Query parameters for binding
133
- **kwargs: Additional DataFrame read options
134
135
Returns:
136
pandas.DataFrame or polars.DataFrame with query results
137
"""
138
pass
139
140
def _get_pandas_df(
141
self,
142
sql: str = "",
143
parameters=None,
144
**kwargs
145
) -> pandas.DataFrame:
146
"""
147
Internal method to get pandas DataFrame.
148
149
Parameters:
150
- sql: SQL query string
151
- parameters: Query parameters for binding
152
- **kwargs: Additional pandas read options
153
154
Returns:
155
pandas DataFrame with query results
156
"""
157
pass
158
159
def _get_polars_df(
160
self,
161
sql: str = "",
162
parameters=None,
163
**kwargs
164
) -> polars.DataFrame:
165
"""
166
Internal method to get polars DataFrame.
167
168
Parameters:
169
- sql: SQL query string
170
- parameters: Query parameters for binding
171
- **kwargs: Additional polars read options
172
173
Returns:
174
polars DataFrame with query results
175
"""
176
pass
177
```
178
179
### Data Insertion
180
181
Insert data into Trino tables with batch processing and transaction management.
182
183
```python { .api }
184
def insert_rows(
185
self,
186
table: str,
187
rows: Iterable[tuple],
188
target_fields: Iterable[str] | None = None,
189
commit_every: int = 0,
190
replace: bool = False,
191
**kwargs
192
) -> None:
193
"""
194
Insert rows into Trino table.
195
196
Parameters:
197
- table: Target table name
198
- rows: Iterable of tuples containing row data
199
- target_fields: Names of columns to fill in the table
200
- commit_every: Maximum rows to insert in one transaction (0 = all rows)
201
- replace: Whether to replace instead of insert
202
- **kwargs: Additional keyword arguments
203
"""
204
pass
205
```
206
207
### Transaction Management
208
209
Manage database transactions and isolation levels for consistent data operations.
210
211
```python { .api }
212
def get_isolation_level(self) -> Any:
213
"""
214
Get current transaction isolation level.
215
216
Returns:
217
Current isolation level setting
218
"""
219
pass
220
```
221
222
### OpenLineage Integration
223
224
Support for data lineage tracking through OpenLineage integration.
225
226
```python { .api }
227
def get_openlineage_database_info(self, connection):
228
"""
229
Get database information for OpenLineage tracking.
230
231
Parameters:
232
- connection: Database connection object
233
234
Returns:
235
Database info dict for lineage tracking
236
"""
237
pass
238
239
def get_openlineage_database_dialect(self, _):
240
"""
241
Get database dialect for OpenLineage.
242
243
Returns:
244
Database dialect identifier
245
"""
246
pass
247
248
def get_openlineage_default_schema(self):
249
"""
250
Get default schema for OpenLineage tracking.
251
252
Returns:
253
Default schema name
254
"""
255
pass
256
```
257
258
### Connection URI
259
260
Generate connection URIs for external integrations and debugging.
261
262
```python { .api }
263
def get_uri(self) -> str:
264
"""
265
Get connection URI string.
266
267
Returns:
268
Connection URI for the current Trino connection
269
"""
270
pass
271
272
@staticmethod
273
def _serialize_cell(cell: Any, conn: Connection | None = None) -> Any:
274
"""
275
Serialize cell value for database insertion.
276
277
Trino will adapt all execute() args internally, hence we return cell without any conversion.
278
279
Parameters:
280
- cell: The cell value to insert into the table
281
- conn: The database connection (optional)
282
283
Returns:
284
The unmodified cell value
285
"""
286
pass
287
```
288
289
## Authentication Configuration
290
291
The hook supports multiple authentication methods configured through Airflow connection extras:
292
293
### Basic Authentication
294
Set connection login and password fields:
295
```python
296
# Connection configuration
297
login = "username"
298
password = "password"
299
```
300
301
### JWT Authentication
302
Configure JWT token in connection extras:
303
```python
304
# Via token string
305
extra = {"auth": "jwt", "jwt__token": "your-jwt-token"}
306
307
# Via token file
308
extra = {"auth": "jwt", "jwt__file": "/path/to/token.jwt"}
309
```
310
311
### Certificate Authentication
312
Configure client certificates in connection extras:
313
```python
314
extra = {
315
"auth": "certs",
316
"certs__client_cert_path": "/path/to/client.crt",
317
"certs__client_key_path": "/path/to/client.key"
318
}
319
```
320
321
### Kerberos Authentication
322
Configure Kerberos settings in connection extras:
323
```python
324
extra = {
325
"auth": "kerberos",
326
"kerberos__config": "/path/to/krb5.conf",
327
"kerberos__service_name": "trino",
328
"kerberos__mutual_authentication": True,
329
"kerberos__force_preemptive": False,
330
"kerberos__hostname_override": "trino.example.com",
331
"kerberos__principal": "user@REALM",
332
"kerberos__delegate": False,
333
"kerberos__ca_bundle": "/path/to/ca-bundle.crt"
334
}
335
```
336
337
## Usage Examples
338
339
### Basic Query Execution
340
341
```python
342
from airflow.providers.trino.hooks.trino import TrinoHook
343
344
# Initialize hook
345
hook = TrinoHook(trino_conn_id='my_trino_conn')
346
347
# Execute simple query
348
sql = "SELECT count(*) FROM catalog.schema.table"
349
result = hook.get_records(sql)
350
print(f"Row count: {result[0][0]}")
351
352
# Get first result
353
first_row = hook.get_first("SELECT * FROM catalog.schema.table LIMIT 1")
354
print(f"First row: {first_row}")
355
```
356
357
### Working with DataFrames
358
359
```python
360
import pandas as pd
361
import polars as pl
362
from airflow.providers.trino.hooks.trino import TrinoHook
363
364
hook = TrinoHook(trino_conn_id='my_trino_conn')
365
366
# Modern approach - get DataFrame (pandas or polars based on configuration)
367
sql = "SELECT id, name, value FROM catalog.schema.table LIMIT 100"
368
df = hook.get_df(sql)
369
370
# Or explicitly get pandas DataFrame
371
df_pandas = hook._get_pandas_df(sql)
372
373
# Or explicitly get polars DataFrame
374
df_polars = hook._get_polars_df(sql)
375
376
# Legacy approach (deprecated)
377
df_legacy = hook.get_pandas_df(sql) # Shows deprecation warning
378
379
# Process DataFrame
380
print(f"DataFrame shape: {df.shape}")
381
print(df.describe())
382
```
383
384
### Parameterized Queries
385
386
```python
387
from airflow.providers.trino.hooks.trino import TrinoHook
388
389
hook = TrinoHook(trino_conn_id='my_trino_conn')
390
391
# Execute parameterized query
392
sql = "SELECT * FROM catalog.schema.table WHERE date >= ? AND status = ?"
393
params = ['2023-01-01', 'active']
394
results = hook.get_records(sql, parameters=params)
395
```
396
397
### Data Insertion
398
399
```python
400
from airflow.providers.trino.hooks.trino import TrinoHook
401
402
hook = TrinoHook(trino_conn_id='my_trino_conn')
403
404
# Prepare data rows
405
rows = [
406
(1, 'Alice', 100.5),
407
(2, 'Bob', 200.0),
408
(3, 'Charlie', 150.75)
409
]
410
411
# Insert data
412
hook.insert_rows(
413
table='catalog.schema.target_table',
414
rows=rows,
415
target_fields=['id', 'name', 'value'],
416
commit_every=1000
417
)
418
```
419
420
## Helper Functions
421
422
### Client Information Generation
423
424
```python { .api }
425
def generate_trino_client_info() -> str:
426
"""
427
Return JSON string with DAG context information.
428
429
Includes dag_id, task_id, logical_date/execution_date, try_number,
430
dag_run_id, and dag_owner from Airflow context.
431
432
Returns:
433
JSON string with task execution context
434
"""
435
pass
436
```
437
438
## Exception Handling
439
440
```python { .api }
441
class TrinoException(Exception):
442
"""
443
Custom exception for Trino-related errors.
444
445
Raised for Trino-specific issues and error conditions.
446
"""
447
pass
448
```
449
450
Common error scenarios:
451
- Connection authentication failures
452
- Invalid SQL syntax
453
- Missing tables or schemas
454
- Permission errors
455
- Network connectivity issues