Client for the Trino distributed SQL Engine with DB-API 2.0 support, low-level client interface, and SQLAlchemy dialect.
Direct HTTP protocol implementation providing fine-grained control over Trino communication. This interface is useful for advanced use cases requiring custom session management, request handling, or integration with existing HTTP infrastructure.
Thread-safe session state management for Trino connections, handling user authentication, catalog/schema context, properties, headers, and transaction state.
class ClientSession:
def __init__(
self,
user: str,
authorization_user: Optional[str] = None,
catalog: Optional[str] = None,
schema: Optional[str] = None,
source: Optional[str] = None,
properties: Optional[Dict[str, str]] = None,
headers: Optional[Dict[str, str]] = None,
transaction_id: Optional[str] = None,
extra_credential: Optional[List[Tuple[str, str]]] = None,
client_tags: Optional[List[str]] = None,
roles: Optional[Union[Dict[str, str], str]] = None,
timezone: Optional[str] = None,
encoding: Optional[Union[str, List[str]]] = None
)
@property
def user(self) -> str
"""Primary user for query execution."""
@property
def authorization_user(self) -> Optional[str]
"""User for authorization (different from query user for impersonation)."""
@authorization_user.setter
def authorization_user(self, authorization_user: Optional[str]) -> None
@property
def catalog(self) -> Optional[str]
"""Default catalog for queries."""
@catalog.setter
def catalog(self, catalog: Optional[str]) -> None
@property
def schema(self) -> Optional[str]
"""Default schema for queries."""
@schema.setter
def schema(self, schema: Optional[str]) -> None
@property
def source(self) -> Optional[str]
"""Query source identifier."""
@property
def properties(self) -> Dict[str, str]
"""Session properties dictionary."""
@properties.setter
def properties(self, properties: Dict[str, str]) -> None
@property
def headers(self) -> Dict[str, str]
"""Additional HTTP headers."""
@property
def transaction_id(self) -> Optional[str]
"""Current transaction ID."""
@transaction_id.setter
def transaction_id(self, transaction_id: Optional[str]) -> None
@property
def extra_credential(self) -> Optional[List[Tuple[str, str]]]
"""Extra credential key-value pairs."""
@property
def client_tags(self) -> List[str]
"""Client tags for query identification."""
@property
def roles(self) -> Dict[str, str]
"""Authorization roles per catalog."""
@roles.setter
def roles(self, roles: Dict[str, str]) -> None
@property
def prepared_statements(self) -> Dict[str, str]
"""Prepared statement name to SQL mapping."""
@prepared_statements.setter
def prepared_statements(self, prepared_statements: Dict[str, str]) -> None
@property
def timezone(self) -> str
"""Session timezone."""
@property
def encoding(self) -> Union[str, List[str]]
"""Spooled protocol encoding preferences."""Low-level HTTP request handling with automatic retry logic, authentication integration, and comprehensive error handling.
class TrinoRequest:
def __init__(
self,
host: str,
port: int,
client_session: ClientSession,
http_session: Optional[Session] = None,
http_scheme: Optional[str] = None,
auth: Optional[Authentication] = None,
max_attempts: int = 3,
request_timeout: Union[float, Tuple[float, float]] = 30.0,
handle_retry: _RetryWithExponentialBackoff = None,
verify: bool = True
)
def post(self, sql: str, additional_http_headers: Optional[Dict[str, Any]] = None) -> Response
"""
Submit SQL query to Trino coordinator.
Parameters:
- sql: SQL statement to execute
- additional_http_headers: Extra headers for this request
Returns:
HTTP response from coordinator
"""
def get(self, url: str) -> Response
"""
GET request to specified URL with session headers.
Parameters:
- url: Full URL to request
Returns:
HTTP response
"""
def delete(self, url: str) -> Response
"""
DELETE request to specified URL.
Parameters:
- url: Full URL to request
Returns:
HTTP response
"""
def process(self, http_response: Response) -> TrinoStatus
"""
Process HTTP response into TrinoStatus object.
Parameters:
- http_response: Raw HTTP response
Returns:
Parsed status information
"""
def unauthenticated(self) -> TrinoRequest
"""Create unauthenticated request instance for spooled segments."""
@property
def transaction_id(self) -> Optional[str]
"""Current transaction ID."""
@transaction_id.setter
def transaction_id(self, value: Optional[str]) -> None
@property
def http_headers(self) -> CaseInsensitiveDict[str]
"""Generated HTTP headers for requests."""
@property
def max_attempts(self) -> int
"""Maximum retry attempts."""
@max_attempts.setter
def max_attempts(self, value: int) -> None
@property
def statement_url(self) -> str
"""URL for statement submission."""
@property
def next_uri(self) -> Optional[str]
"""Next URI for query continuation."""
def get_url(self, path: str) -> str
"""Construct full URL for given path."""
@staticmethod
def raise_response_error(http_response: Response) -> None
"""Raise appropriate exception for HTTP error response."""High-level query execution with result streaming, status tracking, and cancellation support.
class TrinoQuery:
def __init__(
self,
request: TrinoRequest,
query: str,
legacy_primitive_types: bool = False,
fetch_mode: Literal["mapped", "segments"] = "mapped"
)
def execute(self, additional_http_headers: Dict[str, Any] = None) -> TrinoResult
"""
Execute the query and return result iterator.
Parameters:
- additional_http_headers: Extra headers for initial request
Returns:
TrinoResult iterator for consuming rows
"""
def fetch(self) -> List[Union[List[Any]], Any]
"""Fetch next batch of results from the query."""
def cancel(self) -> None
"""Cancel query execution."""
@property
def query_id(self) -> Optional[str]
"""Unique query identifier assigned by Trino."""
@property
def query(self) -> Optional[str]
"""SQL query text."""
@property
def columns(self) -> Optional[List[Dict[str, Any]]]
"""Column metadata for query results."""
@property
def stats(self) -> Dict[str, Any]
"""Query execution statistics."""
@property
def update_type(self) -> Optional[str]
"""Type of update operation if applicable."""
@property
def update_count(self) -> Optional[int]
"""Number of rows affected by update operations."""
@property
def warnings(self) -> List[Dict[str, Any]]
"""Query execution warnings."""
@property
def result(self) -> Optional[TrinoResult]
"""Result iterator object."""
@property
def info_uri(self) -> Optional[str]
"""URI for detailed query information."""
@property
def finished(self) -> bool
"""Whether query execution is complete."""
@property
def cancelled(self) -> bool
"""Whether query was cancelled."""Iterator over query results with row-by-row streaming and automatic result fetching.
class TrinoResult:
def __init__(self, query: TrinoQuery, rows: List[Any])
@property
def rows(self) -> List[Any]
"""Current batch of rows."""
@rows.setter
def rows(self, rows: List[Any]) -> None
@property
def rownumber(self) -> int
"""Current row number (1-based)."""
def __iter__(self) -> Iterator[List[Any]]
"""Iterator interface for consuming all rows."""Structured representation of query execution status with comprehensive metadata.
@dataclass
class TrinoStatus:
id: str
stats: Dict[str, str]
warnings: List[Any]
info_uri: str
next_uri: Optional[str]
update_type: Optional[str]
update_count: Optional[int]
rows: Union[List[Any], Dict[str, Any]]
columns: List[Any]Advanced segment-based result handling for high-throughput scenarios with compression and remote storage.
class Segment:
"""Abstract base class for data segments."""
def __init__(self, segment: Dict[str, Any])
@property
def data(self) -> bytes
"""Raw segment data."""
@property
def metadata(self) -> Dict[str, Any]
"""Segment metadata."""
class InlineSegment(Segment):
"""Segment with base64-encoded inline data."""
def __init__(self, segment: Dict[str, Any])
@property
def data(self) -> bytes
"""Decoded inline data."""
class SpooledSegment(Segment):
"""Segment with remote data retrieval."""
def __init__(self, segment: Dict[str, Any], request: TrinoRequest)
@property
def data(self) -> bytes
"""Data retrieved from remote URI."""
@property
def uri(self) -> str
"""URI for data retrieval."""
@property
def ack_uri(self) -> str
"""URI for acknowledgment."""
@property
def headers(self) -> Dict[str, List[str]]
"""Headers for data retrieval."""
def acknowledge(self) -> None
"""Acknowledge segment processing."""
class DecodableSegment:
"""Segment with encoding information."""
def __init__(self, encoding: str, metadata: Dict[str, Any], segment: Segment)
@property
def encoding(self) -> str
"""Data encoding format."""
@property
def segment(self) -> Segment
"""Underlying segment."""
@property
def metadata(self) -> Dict[str, Any]
"""Segment metadata."""Iterator for processing segments with automatic decompression and row mapping.
class SegmentIterator:
def __init__(self, segments: Union[DecodableSegment, List[DecodableSegment]], mapper: RowMapper)
def __iter__(self) -> Iterator[List[Any]]
"""Iterator over mapped rows from segments."""
def __next__(self) -> List[Any]
"""Next mapped row."""from trino.client import TrinoRequest, TrinoQuery, ClientSession
# Create session
session = ClientSession(
user="testuser",
catalog="memory",
schema="default"
)
# Create request handler
request = TrinoRequest(
host="localhost",
port=8080,
client_session=session
)
# Execute query
query = TrinoQuery(request, "SELECT * FROM users")
result = query.execute()
# Consume results
for row in result:
print(row)from trino.client import ClientSession
session = ClientSession(
user="testuser",
catalog="hive",
schema="default"
)
# Set session properties
session.properties = {
"query_max_run_time": "1h",
"join_distribution_type": "BROADCAST"
}
# Add client tags
session.client_tags = ["analytics", "daily-report"]
# Set roles
session.roles = {"hive": "admin", "system": "reader"}import requests
from trino.client import TrinoRequest, ClientSession
# Create custom HTTP session
http_session = requests.Session()
http_session.cert = ('/path/to/client.cert', '/path/to/client.key')
session = ClientSession(user="testuser")
request = TrinoRequest(
host="secure.trino.example.com",
port=443,
client_session=session,
http_session=http_session,
http_scheme="https"
)from trino.client import TrinoQuery, TrinoRequest, ClientSession
# Create session with spooled encoding
session = ClientSession(
user="testuser",
encoding=["json+zstd", "json+lz4", "json"]
)
request = TrinoRequest(host="localhost", port=8080, client_session=session)
# Query with segment fetch mode
query = TrinoQuery(request, "SELECT * FROM large_table", fetch_mode="segments")
result = query.execute()
# Process segments directly
for segment in result:
# segment is a DecodableSegment
print(f"Segment encoding: {segment.encoding}")
print(f"Segment metadata: {segment.metadata}")import threading
from trino.client import TrinoQuery, TrinoRequest, ClientSession
session = ClientSession(user="testuser")
request = TrinoRequest(host="localhost", port=8080, client_session=session)
query = TrinoQuery(request, "SELECT * FROM very_large_table")
# Start query in background
def run_query():
result = query.execute()
for row in result:
if query.cancelled:
break
print(row)
query_thread = threading.Thread(target=run_query)
query_thread.start()
# Cancel after 10 seconds
threading.Timer(10.0, query.cancel).start()Install with Tessl CLI
npx tessl i tessl/pypi-trino