CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-trino

Client for the Trino distributed SQL Engine with DB-API 2.0 support, low-level client interface, and SQLAlchemy dialect.

Overview
Eval results
Files

low-level-client.mddocs/

Low-Level Client

Direct HTTP protocol implementation providing fine-grained control over Trino communication. This interface is useful for advanced use cases requiring custom session management, request handling, or integration with existing HTTP infrastructure.

Capabilities

Client Session Management

Thread-safe session state management for Trino connections, handling user authentication, catalog/schema context, properties, headers, and transaction state.

class ClientSession:
    def __init__(
        self,
        user: str,
        authorization_user: Optional[str] = None,
        catalog: Optional[str] = None,
        schema: Optional[str] = None,
        source: Optional[str] = None,
        properties: Optional[Dict[str, str]] = None,
        headers: Optional[Dict[str, str]] = None,
        transaction_id: Optional[str] = None,
        extra_credential: Optional[List[Tuple[str, str]]] = None,
        client_tags: Optional[List[str]] = None,
        roles: Optional[Union[Dict[str, str], str]] = None,
        timezone: Optional[str] = None,
        encoding: Optional[Union[str, List[str]]] = None
    )
    
    @property
    def user(self) -> str
    """Primary user for query execution."""
    
    @property
    def authorization_user(self) -> Optional[str]
    """User for authorization (different from query user for impersonation)."""
    
    @authorization_user.setter
    def authorization_user(self, authorization_user: Optional[str]) -> None
    
    @property
    def catalog(self) -> Optional[str]
    """Default catalog for queries."""
    
    @catalog.setter
    def catalog(self, catalog: Optional[str]) -> None
    
    @property
    def schema(self) -> Optional[str]
    """Default schema for queries."""
    
    @schema.setter
    def schema(self, schema: Optional[str]) -> None
    
    @property
    def source(self) -> Optional[str]
    """Query source identifier."""
    
    @property
    def properties(self) -> Dict[str, str]
    """Session properties dictionary."""
    
    @properties.setter
    def properties(self, properties: Dict[str, str]) -> None
    
    @property
    def headers(self) -> Dict[str, str]
    """Additional HTTP headers."""
    
    @property
    def transaction_id(self) -> Optional[str]
    """Current transaction ID."""
    
    @transaction_id.setter
    def transaction_id(self, transaction_id: Optional[str]) -> None
    
    @property
    def extra_credential(self) -> Optional[List[Tuple[str, str]]]
    """Extra credential key-value pairs."""
    
    @property
    def client_tags(self) -> List[str]
    """Client tags for query identification."""
    
    @property
    def roles(self) -> Dict[str, str]
    """Authorization roles per catalog."""
    
    @roles.setter
    def roles(self, roles: Dict[str, str]) -> None
    
    @property
    def prepared_statements(self) -> Dict[str, str]
    """Prepared statement name to SQL mapping."""
    
    @prepared_statements.setter
    def prepared_statements(self, prepared_statements: Dict[str, str]) -> None
    
    @property
    def timezone(self) -> str
    """Session timezone."""
    
    @property
    def encoding(self) -> Union[str, List[str]]
    """Spooled protocol encoding preferences."""

HTTP Request Management

Low-level HTTP request handling with automatic retry logic, authentication integration, and comprehensive error handling.

class TrinoRequest:
    def __init__(
        self,
        host: str,
        port: int,
        client_session: ClientSession,
        http_session: Optional[Session] = None,
        http_scheme: Optional[str] = None,
        auth: Optional[Authentication] = None,
        max_attempts: int = 3,
        request_timeout: Union[float, Tuple[float, float]] = 30.0,
        handle_retry: _RetryWithExponentialBackoff = None,
        verify: bool = True
    )
    
    def post(self, sql: str, additional_http_headers: Optional[Dict[str, Any]] = None) -> Response
    """
    Submit SQL query to Trino coordinator.
    
    Parameters:
    - sql: SQL statement to execute
    - additional_http_headers: Extra headers for this request
    
    Returns:
    HTTP response from coordinator
    """
    
    def get(self, url: str) -> Response
    """
    GET request to specified URL with session headers.
    
    Parameters:
    - url: Full URL to request
    
    Returns:
    HTTP response
    """
    
    def delete(self, url: str) -> Response
    """
    DELETE request to specified URL.
    
    Parameters:
    - url: Full URL to request
    
    Returns:
    HTTP response
    """
    
    def process(self, http_response: Response) -> TrinoStatus
    """
    Process HTTP response into TrinoStatus object.
    
    Parameters:
    - http_response: Raw HTTP response
    
    Returns:
    Parsed status information
    """
    
    def unauthenticated(self) -> TrinoRequest
    """Create unauthenticated request instance for spooled segments."""
    
    @property
    def transaction_id(self) -> Optional[str]
    """Current transaction ID."""
    
    @transaction_id.setter
    def transaction_id(self, value: Optional[str]) -> None
    
    @property
    def http_headers(self) -> CaseInsensitiveDict[str]
    """Generated HTTP headers for requests."""
    
    @property
    def max_attempts(self) -> int
    """Maximum retry attempts."""
    
    @max_attempts.setter
    def max_attempts(self, value: int) -> None
    
    @property
    def statement_url(self) -> str
    """URL for statement submission."""
    
    @property
    def next_uri(self) -> Optional[str]
    """Next URI for query continuation."""
    
    def get_url(self, path: str) -> str
    """Construct full URL for given path."""
    
    @staticmethod
    def raise_response_error(http_response: Response) -> None
    """Raise appropriate exception for HTTP error response."""

Query Execution

High-level query execution with result streaming, status tracking, and cancellation support.

class TrinoQuery:
    def __init__(
        self,
        request: TrinoRequest,
        query: str,
        legacy_primitive_types: bool = False,
        fetch_mode: Literal["mapped", "segments"] = "mapped"
    )
    
    def execute(self, additional_http_headers: Dict[str, Any] = None) -> TrinoResult
    """
    Execute the query and return result iterator.
    
    Parameters:
    - additional_http_headers: Extra headers for initial request
    
    Returns:
    TrinoResult iterator for consuming rows
    """
    
    def fetch(self) -> List[Union[List[Any]], Any]
    """Fetch next batch of results from the query."""
    
    def cancel(self) -> None
    """Cancel query execution."""
    
    @property
    def query_id(self) -> Optional[str]
    """Unique query identifier assigned by Trino."""
    
    @property
    def query(self) -> Optional[str]
    """SQL query text."""
    
    @property
    def columns(self) -> Optional[List[Dict[str, Any]]]
    """Column metadata for query results."""
    
    @property
    def stats(self) -> Dict[str, Any]
    """Query execution statistics."""
    
    @property
    def update_type(self) -> Optional[str]
    """Type of update operation if applicable."""
    
    @property
    def update_count(self) -> Optional[int]
    """Number of rows affected by update operations."""
    
    @property
    def warnings(self) -> List[Dict[str, Any]]
    """Query execution warnings."""
    
    @property
    def result(self) -> Optional[TrinoResult]
    """Result iterator object."""
    
    @property
    def info_uri(self) -> Optional[str]
    """URI for detailed query information."""
    
    @property
    def finished(self) -> bool
    """Whether query execution is complete."""
    
    @property
    def cancelled(self) -> bool
    """Whether query was cancelled."""

Result Iteration

Iterator over query results with row-by-row streaming and automatic result fetching.

class TrinoResult:
    def __init__(self, query: TrinoQuery, rows: List[Any])
    
    @property
    def rows(self) -> List[Any]
    """Current batch of rows."""
    
    @rows.setter
    def rows(self, rows: List[Any]) -> None
    
    @property
    def rownumber(self) -> int
    """Current row number (1-based)."""
    
    def __iter__(self) -> Iterator[List[Any]]
    """Iterator interface for consuming all rows."""

Query Status Information

Structured representation of query execution status with comprehensive metadata.

@dataclass
class TrinoStatus:
    id: str
    stats: Dict[str, str]
    warnings: List[Any]
    info_uri: str
    next_uri: Optional[str]
    update_type: Optional[str]
    update_count: Optional[int]
    rows: Union[List[Any], Dict[str, Any]]
    columns: List[Any]

Spooled Protocol Support

Advanced segment-based result handling for high-throughput scenarios with compression and remote storage.

class Segment:
    """Abstract base class for data segments."""
    def __init__(self, segment: Dict[str, Any])
    
    @property
    def data(self) -> bytes
    """Raw segment data."""
    
    @property
    def metadata(self) -> Dict[str, Any]
    """Segment metadata."""

class InlineSegment(Segment):
    """Segment with base64-encoded inline data."""
    def __init__(self, segment: Dict[str, Any])
    
    @property
    def data(self) -> bytes
    """Decoded inline data."""

class SpooledSegment(Segment):
    """Segment with remote data retrieval."""
    def __init__(self, segment: Dict[str, Any], request: TrinoRequest)
    
    @property
    def data(self) -> bytes
    """Data retrieved from remote URI."""
    
    @property
    def uri(self) -> str
    """URI for data retrieval."""
    
    @property
    def ack_uri(self) -> str
    """URI for acknowledgment."""
    
    @property
    def headers(self) -> Dict[str, List[str]]
    """Headers for data retrieval."""
    
    def acknowledge(self) -> None
    """Acknowledge segment processing."""

class DecodableSegment:
    """Segment with encoding information."""
    def __init__(self, encoding: str, metadata: Dict[str, Any], segment: Segment)
    
    @property
    def encoding(self) -> str
    """Data encoding format."""
    
    @property
    def segment(self) -> Segment
    """Underlying segment."""
    
    @property
    def metadata(self) -> Dict[str, Any]
    """Segment metadata."""

Segment Iteration

Iterator for processing segments with automatic decompression and row mapping.

class SegmentIterator:
    def __init__(self, segments: Union[DecodableSegment, List[DecodableSegment]], mapper: RowMapper)
    
    def __iter__(self) -> Iterator[List[Any]]
    """Iterator over mapped rows from segments."""
    
    def __next__(self) -> List[Any]
    """Next mapped row."""

Usage Examples

Basic Low-Level Query

from trino.client import TrinoRequest, TrinoQuery, ClientSession

# Create session
session = ClientSession(
    user="testuser",
    catalog="memory",
    schema="default"
)

# Create request handler
request = TrinoRequest(
    host="localhost",
    port=8080,
    client_session=session
)

# Execute query
query = TrinoQuery(request, "SELECT * FROM users")
result = query.execute()

# Consume results
for row in result:
    print(row)

Session Property Management

from trino.client import ClientSession

session = ClientSession(
    user="testuser",
    catalog="hive",
    schema="default"
)

# Set session properties
session.properties = {
    "query_max_run_time": "1h",
    "join_distribution_type": "BROADCAST"
}

# Add client tags
session.client_tags = ["analytics", "daily-report"]

# Set roles
session.roles = {"hive": "admin", "system": "reader"}

Custom HTTP Session

import requests
from trino.client import TrinoRequest, ClientSession

# Create custom HTTP session
http_session = requests.Session()
http_session.cert = ('/path/to/client.cert', '/path/to/client.key')

session = ClientSession(user="testuser")
request = TrinoRequest(
    host="secure.trino.example.com",
    port=443,
    client_session=session,
    http_session=http_session,
    http_scheme="https"
)

Spooled Protocol Usage

from trino.client import TrinoQuery, TrinoRequest, ClientSession

# Create session with spooled encoding
session = ClientSession(
    user="testuser",
    encoding=["json+zstd", "json+lz4", "json"]
)

request = TrinoRequest(host="localhost", port=8080, client_session=session)

# Query with segment fetch mode
query = TrinoQuery(request, "SELECT * FROM large_table", fetch_mode="segments")
result = query.execute()

# Process segments directly
for segment in result:
    # segment is a DecodableSegment
    print(f"Segment encoding: {segment.encoding}")
    print(f"Segment metadata: {segment.metadata}")

Query Cancellation

import threading
from trino.client import TrinoQuery, TrinoRequest, ClientSession

session = ClientSession(user="testuser")
request = TrinoRequest(host="localhost", port=8080, client_session=session)
query = TrinoQuery(request, "SELECT * FROM very_large_table")

# Start query in background
def run_query():
    result = query.execute()
    for row in result:
        if query.cancelled:
            break
        print(row)

query_thread = threading.Thread(target=run_query)
query_thread.start()

# Cancel after 10 seconds
threading.Timer(10.0, query.cancel).start()

Install with Tessl CLI

npx tessl i tessl/pypi-trino

docs

authentication.md

db-api.md

index.md

low-level-client.md

sqlalchemy.md

tile.json