0
# Low-Level Client
1
2
Direct HTTP protocol implementation providing fine-grained control over Trino communication. This interface is useful for advanced use cases requiring custom session management, request handling, or integration with existing HTTP infrastructure.
3
4
## Capabilities
5
6
### Client Session Management
7
8
Thread-safe session state management for Trino connections, handling user authentication, catalog/schema context, properties, headers, and transaction state.
9
10
```python { .api }
11
class ClientSession:
12
def __init__(
13
self,
14
user: str,
15
authorization_user: Optional[str] = None,
16
catalog: Optional[str] = None,
17
schema: Optional[str] = None,
18
source: Optional[str] = None,
19
properties: Optional[Dict[str, str]] = None,
20
headers: Optional[Dict[str, str]] = None,
21
transaction_id: Optional[str] = None,
22
extra_credential: Optional[List[Tuple[str, str]]] = None,
23
client_tags: Optional[List[str]] = None,
24
roles: Optional[Union[Dict[str, str], str]] = None,
25
timezone: Optional[str] = None,
26
encoding: Optional[Union[str, List[str]]] = None
27
)
28
29
@property
30
def user(self) -> str
31
"""Primary user for query execution."""
32
33
@property
34
def authorization_user(self) -> Optional[str]
35
"""User for authorization (different from query user for impersonation)."""
36
37
@authorization_user.setter
38
def authorization_user(self, authorization_user: Optional[str]) -> None
39
40
@property
41
def catalog(self) -> Optional[str]
42
"""Default catalog for queries."""
43
44
@catalog.setter
45
def catalog(self, catalog: Optional[str]) -> None
46
47
@property
48
def schema(self) -> Optional[str]
49
"""Default schema for queries."""
50
51
@schema.setter
52
def schema(self, schema: Optional[str]) -> None
53
54
@property
55
def source(self) -> Optional[str]
56
"""Query source identifier."""
57
58
@property
59
def properties(self) -> Dict[str, str]
60
"""Session properties dictionary."""
61
62
@properties.setter
63
def properties(self, properties: Dict[str, str]) -> None
64
65
@property
66
def headers(self) -> Dict[str, str]
67
"""Additional HTTP headers."""
68
69
@property
70
def transaction_id(self) -> Optional[str]
71
"""Current transaction ID."""
72
73
@transaction_id.setter
74
def transaction_id(self, transaction_id: Optional[str]) -> None
75
76
@property
77
def extra_credential(self) -> Optional[List[Tuple[str, str]]]
78
"""Extra credential key-value pairs."""
79
80
@property
81
def client_tags(self) -> List[str]
82
"""Client tags for query identification."""
83
84
@property
85
def roles(self) -> Dict[str, str]
86
"""Authorization roles per catalog."""
87
88
@roles.setter
89
def roles(self, roles: Dict[str, str]) -> None
90
91
@property
92
def prepared_statements(self) -> Dict[str, str]
93
"""Prepared statement name to SQL mapping."""
94
95
@prepared_statements.setter
96
def prepared_statements(self, prepared_statements: Dict[str, str]) -> None
97
98
@property
99
def timezone(self) -> str
100
"""Session timezone."""
101
102
@property
103
def encoding(self) -> Union[str, List[str]]
104
"""Spooled protocol encoding preferences."""
105
```
106
107
### HTTP Request Management
108
109
Low-level HTTP request handling with automatic retry logic, authentication integration, and comprehensive error handling.
110
111
```python { .api }
112
class TrinoRequest:
113
def __init__(
114
self,
115
host: str,
116
port: int,
117
client_session: ClientSession,
118
http_session: Optional[Session] = None,
119
http_scheme: Optional[str] = None,
120
auth: Optional[Authentication] = None,
121
max_attempts: int = 3,
122
request_timeout: Union[float, Tuple[float, float]] = 30.0,
123
handle_retry: _RetryWithExponentialBackoff = None,
124
verify: bool = True
125
)
126
127
def post(self, sql: str, additional_http_headers: Optional[Dict[str, Any]] = None) -> Response
128
"""
129
Submit SQL query to Trino coordinator.
130
131
Parameters:
132
- sql: SQL statement to execute
133
- additional_http_headers: Extra headers for this request
134
135
Returns:
136
HTTP response from coordinator
137
"""
138
139
def get(self, url: str) -> Response
140
"""
141
GET request to specified URL with session headers.
142
143
Parameters:
144
- url: Full URL to request
145
146
Returns:
147
HTTP response
148
"""
149
150
def delete(self, url: str) -> Response
151
"""
152
DELETE request to specified URL.
153
154
Parameters:
155
- url: Full URL to request
156
157
Returns:
158
HTTP response
159
"""
160
161
def process(self, http_response: Response) -> TrinoStatus
162
"""
163
Process HTTP response into TrinoStatus object.
164
165
Parameters:
166
- http_response: Raw HTTP response
167
168
Returns:
169
Parsed status information
170
"""
171
172
def unauthenticated(self) -> TrinoRequest
173
"""Create unauthenticated request instance for spooled segments."""
174
175
@property
176
def transaction_id(self) -> Optional[str]
177
"""Current transaction ID."""
178
179
@transaction_id.setter
180
def transaction_id(self, value: Optional[str]) -> None
181
182
@property
183
def http_headers(self) -> CaseInsensitiveDict[str]
184
"""Generated HTTP headers for requests."""
185
186
@property
187
def max_attempts(self) -> int
188
"""Maximum retry attempts."""
189
190
@max_attempts.setter
191
def max_attempts(self, value: int) -> None
192
193
@property
194
def statement_url(self) -> str
195
"""URL for statement submission."""
196
197
@property
198
def next_uri(self) -> Optional[str]
199
"""Next URI for query continuation."""
200
201
def get_url(self, path: str) -> str
202
"""Construct full URL for given path."""
203
204
@staticmethod
205
def raise_response_error(http_response: Response) -> None
206
"""Raise appropriate exception for HTTP error response."""
207
```
208
209
### Query Execution
210
211
High-level query execution with result streaming, status tracking, and cancellation support.
212
213
```python { .api }
214
class TrinoQuery:
215
def __init__(
216
self,
217
request: TrinoRequest,
218
query: str,
219
legacy_primitive_types: bool = False,
220
fetch_mode: Literal["mapped", "segments"] = "mapped"
221
)
222
223
def execute(self, additional_http_headers: Dict[str, Any] = None) -> TrinoResult
224
"""
225
Execute the query and return result iterator.
226
227
Parameters:
228
- additional_http_headers: Extra headers for initial request
229
230
Returns:
231
TrinoResult iterator for consuming rows
232
"""
233
234
def fetch(self) -> List[Union[List[Any]], Any]
235
"""Fetch next batch of results from the query."""
236
237
def cancel(self) -> None
238
"""Cancel query execution."""
239
240
@property
241
def query_id(self) -> Optional[str]
242
"""Unique query identifier assigned by Trino."""
243
244
@property
245
def query(self) -> Optional[str]
246
"""SQL query text."""
247
248
@property
249
def columns(self) -> Optional[List[Dict[str, Any]]]
250
"""Column metadata for query results."""
251
252
@property
253
def stats(self) -> Dict[str, Any]
254
"""Query execution statistics."""
255
256
@property
257
def update_type(self) -> Optional[str]
258
"""Type of update operation if applicable."""
259
260
@property
261
def update_count(self) -> Optional[int]
262
"""Number of rows affected by update operations."""
263
264
@property
265
def warnings(self) -> List[Dict[str, Any]]
266
"""Query execution warnings."""
267
268
@property
269
def result(self) -> Optional[TrinoResult]
270
"""Result iterator object."""
271
272
@property
273
def info_uri(self) -> Optional[str]
274
"""URI for detailed query information."""
275
276
@property
277
def finished(self) -> bool
278
"""Whether query execution is complete."""
279
280
@property
281
def cancelled(self) -> bool
282
"""Whether query was cancelled."""
283
```
284
285
### Result Iteration
286
287
Iterator over query results with row-by-row streaming and automatic result fetching.
288
289
```python { .api }
290
class TrinoResult:
291
def __init__(self, query: TrinoQuery, rows: List[Any])
292
293
@property
294
def rows(self) -> List[Any]
295
"""Current batch of rows."""
296
297
@rows.setter
298
def rows(self, rows: List[Any]) -> None
299
300
@property
301
def rownumber(self) -> int
302
"""Current row number (1-based)."""
303
304
def __iter__(self) -> Iterator[List[Any]]
305
"""Iterator interface for consuming all rows."""
306
```
307
308
### Query Status Information
309
310
Structured representation of query execution status with comprehensive metadata.
311
312
```python { .api }
313
@dataclass
314
class TrinoStatus:
315
id: str
316
stats: Dict[str, str]
317
warnings: List[Any]
318
info_uri: str
319
next_uri: Optional[str]
320
update_type: Optional[str]
321
update_count: Optional[int]
322
rows: Union[List[Any], Dict[str, Any]]
323
columns: List[Any]
324
```
325
326
### Spooled Protocol Support
327
328
Advanced segment-based result handling for high-throughput scenarios with compression and remote storage.
329
330
```python { .api }
331
class Segment:
332
"""Abstract base class for data segments."""
333
def __init__(self, segment: Dict[str, Any])
334
335
@property
336
def data(self) -> bytes
337
"""Raw segment data."""
338
339
@property
340
def metadata(self) -> Dict[str, Any]
341
"""Segment metadata."""
342
343
class InlineSegment(Segment):
344
"""Segment with base64-encoded inline data."""
345
def __init__(self, segment: Dict[str, Any])
346
347
@property
348
def data(self) -> bytes
349
"""Decoded inline data."""
350
351
class SpooledSegment(Segment):
352
"""Segment with remote data retrieval."""
353
def __init__(self, segment: Dict[str, Any], request: TrinoRequest)
354
355
@property
356
def data(self) -> bytes
357
"""Data retrieved from remote URI."""
358
359
@property
360
def uri(self) -> str
361
"""URI for data retrieval."""
362
363
@property
364
def ack_uri(self) -> str
365
"""URI for acknowledgment."""
366
367
@property
368
def headers(self) -> Dict[str, List[str]]
369
"""Headers for data retrieval."""
370
371
def acknowledge(self) -> None
372
"""Acknowledge segment processing."""
373
374
class DecodableSegment:
375
"""Segment with encoding information."""
376
def __init__(self, encoding: str, metadata: Dict[str, Any], segment: Segment)
377
378
@property
379
def encoding(self) -> str
380
"""Data encoding format."""
381
382
@property
383
def segment(self) -> Segment
384
"""Underlying segment."""
385
386
@property
387
def metadata(self) -> Dict[str, Any]
388
"""Segment metadata."""
389
```
390
391
### Segment Iteration
392
393
Iterator for processing segments with automatic decompression and row mapping.
394
395
```python { .api }
396
class SegmentIterator:
397
def __init__(self, segments: Union[DecodableSegment, List[DecodableSegment]], mapper: RowMapper)
398
399
def __iter__(self) -> Iterator[List[Any]]
400
"""Iterator over mapped rows from segments."""
401
402
def __next__(self) -> List[Any]
403
"""Next mapped row."""
404
```
405
406
## Usage Examples
407
408
### Basic Low-Level Query
409
410
```python
411
from trino.client import TrinoRequest, TrinoQuery, ClientSession
412
413
# Create session
414
session = ClientSession(
415
user="testuser",
416
catalog="memory",
417
schema="default"
418
)
419
420
# Create request handler
421
request = TrinoRequest(
422
host="localhost",
423
port=8080,
424
client_session=session
425
)
426
427
# Execute query
428
query = TrinoQuery(request, "SELECT * FROM users")
429
result = query.execute()
430
431
# Consume results
432
for row in result:
433
print(row)
434
```
435
436
### Session Property Management
437
438
```python
439
from trino.client import ClientSession
440
441
session = ClientSession(
442
user="testuser",
443
catalog="hive",
444
schema="default"
445
)
446
447
# Set session properties
448
session.properties = {
449
"query_max_run_time": "1h",
450
"join_distribution_type": "BROADCAST"
451
}
452
453
# Add client tags
454
session.client_tags = ["analytics", "daily-report"]
455
456
# Set roles
457
session.roles = {"hive": "admin", "system": "reader"}
458
```
459
460
### Custom HTTP Session
461
462
```python
463
import requests
464
from trino.client import TrinoRequest, ClientSession
465
466
# Create custom HTTP session
467
http_session = requests.Session()
468
http_session.cert = ('/path/to/client.cert', '/path/to/client.key')
469
470
session = ClientSession(user="testuser")
471
request = TrinoRequest(
472
host="secure.trino.example.com",
473
port=443,
474
client_session=session,
475
http_session=http_session,
476
http_scheme="https"
477
)
478
```
479
480
### Spooled Protocol Usage
481
482
```python
483
from trino.client import TrinoQuery, TrinoRequest, ClientSession
484
485
# Create session with spooled encoding
486
session = ClientSession(
487
user="testuser",
488
encoding=["json+zstd", "json+lz4", "json"]
489
)
490
491
request = TrinoRequest(host="localhost", port=8080, client_session=session)
492
493
# Query with segment fetch mode
494
query = TrinoQuery(request, "SELECT * FROM large_table", fetch_mode="segments")
495
result = query.execute()
496
497
# Process segments directly
498
for segment in result:
499
# segment is a DecodableSegment
500
print(f"Segment encoding: {segment.encoding}")
501
print(f"Segment metadata: {segment.metadata}")
502
```
503
504
### Query Cancellation
505
506
```python
507
import threading
508
from trino.client import TrinoQuery, TrinoRequest, ClientSession
509
510
session = ClientSession(user="testuser")
511
request = TrinoRequest(host="localhost", port=8080, client_session=session)
512
query = TrinoQuery(request, "SELECT * FROM very_large_table")
513
514
# Start query in background
515
def run_query():
516
result = query.execute()
517
for row in result:
518
if query.cancelled:
519
break
520
print(row)
521
522
query_thread = threading.Thread(target=run_query)
523
query_thread.start()
524
525
# Cancel after 10 seconds
526
threading.Timer(10.0, query.cancel).start()
527
```