CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-trino

Client for the Trino distributed SQL Engine with DB-API 2.0 support, low-level client interface, and SQLAlchemy dialect.

Overview
Eval results
Files

db-api.mddocs/

DB-API 2.0 Interface

Python Database API 2.0 compliant interface providing standard database connectivity patterns. This is the recommended interface for most applications as it follows established Python database standards and integrates seamlessly with existing database tooling.

Capabilities

Connection Factory

Creates database connections with comprehensive configuration options including authentication, session properties, timeouts, and protocol settings.

def connect(
    host: str,
    port: int = None,
    user: str = None,
    source: str = "trino-python-client",
    catalog: str = None,
    schema: str = None,
    session_properties: Dict[str, str] = None,
    http_headers: Dict[str, str] = None,
    http_scheme: str = None,
    auth: Authentication = None,
    extra_credential: List[Tuple[str, str]] = None,
    max_attempts: int = 3,
    request_timeout: float = 30.0,
    isolation_level: IsolationLevel = IsolationLevel.AUTOCOMMIT,
    verify: bool = True,
    http_session: Session = None,
    client_tags: List[str] = None,
    legacy_primitive_types: bool = False,
    legacy_prepared_statements: bool = None,
    roles: Union[Dict[str, str], str] = None,
    timezone: str = None,
    encoding: Union[str, List[str]] = None
) -> Connection

Parameters:

  • host: Trino coordinator hostname or URL
  • port: TCP port (default: 8080 for HTTP, 443 for HTTPS)
  • user: Username for authentication and query attribution
  • source: Query source identifier for tracking
  • catalog: Default catalog for queries
  • schema: Default schema for queries
  • session_properties: Trino session configuration properties
  • http_headers: Additional HTTP headers
  • http_scheme: "http" or "https" (auto-detected from URL or port)
  • auth: Authentication instance (BasicAuthentication, JWTAuthentication, etc.)
  • extra_credential: Additional credentials as key-value pairs
  • max_attempts: Maximum retry attempts for failed requests
  • request_timeout: Request timeout in seconds
  • isolation_level: Transaction isolation level
  • verify: SSL certificate verification (boolean or path to CA bundle)
  • http_session: Custom requests.Session instance
  • client_tags: Tags for query identification
  • legacy_primitive_types: Return string representations for edge-case values
  • legacy_prepared_statements: Force legacy prepared statement protocol
  • roles: Authorization roles for catalogs
  • timezone: Session timezone (IANA timezone name)
  • encoding: Spooled protocol encoding preference

Connection Management

Database connection object providing transaction control, cursor creation, and resource management with context manager support.

class Connection:
    def __init__(
        self,
        host: str,
        port: int = None,
        user: str = None,
        **kwargs
    )
    
    def cursor(
        self,
        cursor_style: str = "row",
        legacy_primitive_types: bool = None
    ) -> Cursor
    """
    Create a new cursor for executing queries.
    
    Parameters:
    - cursor_style: "row" for regular cursor, "segment" for spooled segments
    - legacy_primitive_types: Override connection-level setting
    
    Returns:
    Cursor instance for query execution
    """
    
    def commit(self) -> None
    """Commit the current transaction."""
    
    def rollback(self) -> None
    """Rollback the current transaction."""
    
    def close(self) -> None
    """Close the connection and release resources."""
    
    def start_transaction(self) -> Transaction
    """Start a new transaction and return transaction object."""
    
    @property
    def isolation_level(self) -> IsolationLevel
    """Current transaction isolation level."""
    
    @property
    def transaction(self) -> Optional[Transaction]
    """Current transaction object if active."""
    
    # Context manager support
    def __enter__(self) -> Connection
    def __exit__(self, exc_type, exc_value, traceback) -> None

Query Execution

Cursor object for executing SQL statements with support for parameterized queries, batch execution, and comprehensive result retrieval.

class Cursor:
    def __init__(
        self,
        connection: Connection,
        request: TrinoRequest,
        legacy_primitive_types: bool = False
    )
    
    def execute(self, operation: str, params: List[Any] = None) -> Cursor
    """
    Execute a SQL query with optional parameters.
    
    Parameters:
    - operation: SQL statement to execute
    - params: Query parameters for prepared statement execution
    
    Returns:
    Self for method chaining
    """
    
    def executemany(self, operation: str, seq_of_params: List[List[Any]]) -> Cursor
    """
    Execute SQL query against multiple parameter sets.
    
    Parameters:
    - operation: SQL statement to execute
    - seq_of_params: Sequence of parameter lists
    
    Returns:
    Self for method chaining
    """
    
    def fetchone(self) -> Optional[List[Any]]
    """Fetch next row or None if no more data."""
    
    def fetchmany(self, size: int = None) -> List[List[Any]]
    """
    Fetch multiple rows.
    
    Parameters:
    - size: Number of rows to fetch (defaults to arraysize)
    
    Returns:
    List of rows (each row is a list of column values)
    """
    
    def fetchall(self) -> List[List[Any]]
    """Fetch all remaining rows."""
    
    def describe(self, sql: str) -> List[DescribeOutput]
    """
    Get column information for a SQL statement without executing it.
    
    Parameters:
    - sql: SQL statement to analyze
    
    Returns:
    List of DescribeOutput objects with column metadata
    """
    
    def cancel(self) -> None
    """Cancel the currently executing query."""
    
    def close(self) -> None
    """Close the cursor and cancel any running queries."""
    
    @property
    def description(self) -> List[ColumnDescription]
    """Column descriptions for the last executed query."""
    
    @property
    def rowcount(self) -> int
    """Number of rows affected by last query (-1 if unknown)."""
    
    @property
    def arraysize(self) -> int
    """Default number of rows for fetchmany()."""
    
    @arraysize.setter
    def arraysize(self, size: int) -> None
    
    @property
    def query_id(self) -> Optional[str]
    """Trino query ID for the last executed query."""
    
    @property
    def query(self) -> Optional[str]
    """SQL text of the last executed query."""
    
    @property
    def stats(self) -> Optional[Dict[str, Any]]
    """Query execution statistics."""
    
    @property
    def warnings(self) -> Optional[List[Dict[str, Any]]]
    """Query execution warnings."""
    
    @property
    def info_uri(self) -> Optional[str]
    """URI for detailed query information."""
    
    @property
    def update_type(self) -> Optional[str]
    """Type of update operation (INSERT, UPDATE, DELETE, etc.)."""
    
    # Context manager support
    def __enter__(self) -> Cursor
    def __exit__(self, exc_type, exc_value, traceback) -> None

Segment Cursor

Specialized cursor for handling spooled protocol segments, providing direct access to compressed data segments for high-performance scenarios.

class SegmentCursor(Cursor):
    def __init__(
        self,
        connection: Connection,
        request: TrinoRequest,
        legacy_primitive_types: bool = False
    )
    
    def execute(self, operation: str, params: List[Any] = None) -> SegmentCursor
    """
    Execute query and return segments instead of mapped rows.
    Note: params are not supported for SegmentCursor.
    """

Metadata Objects

Data structures for query and column metadata with rich type information and result descriptions.

class ColumnDescription:
    """Column metadata following DB-API 2.0 specification."""
    name: str
    type_code: int
    display_size: int
    internal_size: int
    precision: int
    scale: int
    null_ok: bool
    
    @classmethod
    def from_column(cls, column: Dict[str, Any]) -> ColumnDescription

class DescribeOutput:
    """Output column information from DESCRIBE statement."""
    name: str
    catalog: str
    schema: str
    table: str
    type: str
    type_size: int
    aliased: bool
    
    @classmethod
    def from_row(cls, row: List[Any]) -> DescribeOutput

Global Module Attributes

DB-API 2.0 compliance information and global configuration.

apilevel: str = "2.0"
threadsafety: int = 2
paramstyle: str = "qmark"

Type Objects

Type classification objects for result column type checking following DB-API 2.0 specification.

class DBAPITypeObject:
    def __init__(self, *values: str)
    def __eq__(self, other: str) -> bool

# Type categories
STRING: DBAPITypeObject  # VARCHAR, CHAR, VARBINARY, JSON, IPADDRESS
BINARY: DBAPITypeObject  # ARRAY, MAP, ROW, HyperLogLog, P4HyperLogLog, QDigest
NUMBER: DBAPITypeObject  # BOOLEAN, TINYINT, SMALLINT, INTEGER, BIGINT, REAL, DOUBLE, DECIMAL
DATETIME: DBAPITypeObject  # DATE, TIME, TIMESTAMP, INTERVAL types
ROWID: DBAPITypeObject  # (empty - Trino has no row IDs)

Date/Time Constructors

DB-API 2.0 date and time constructor functions for creating temporal values.

def DateFromTicks(ticks: float) -> date
"""Create date from Unix timestamp."""

def TimestampFromTicks(ticks: float) -> datetime
"""Create datetime from Unix timestamp."""

def TimeFromTicks(ticks: float) -> time
"""Create time from Unix timestamp."""

def Binary(string: str) -> bytes
"""Convert string to binary data."""

# Type aliases
Date = date
Time = time
Timestamp = datetime

Usage Examples

Basic Query Execution

from trino.dbapi import connect

conn = connect(
    host="localhost",
    port=8080,
    user="testuser",
    catalog="memory",
    schema="default"
)

cur = conn.cursor()
cur.execute("SELECT name, age FROM users WHERE age > ?", [25])

# Fetch results
for row in cur:
    print(f"Name: {row[0]}, Age: {row[1]}")

cur.close()
conn.close()

Transaction Management

from trino.dbapi import connect
from trino.transaction import IsolationLevel

conn = connect(
    host="localhost",
    port=8080,
    user="testuser",
    catalog="memory",
    schema="default",
    isolation_level=IsolationLevel.READ_COMMITTED
)

with conn:  # Auto-commit on success, rollback on exception
    cur = conn.cursor()
    cur.execute("INSERT INTO users VALUES (?, ?)", ["John", 30])
    cur.execute("INSERT INTO users VALUES (?, ?)", ["Jane", 25])

Batch Operations

from trino.dbapi import connect

conn = connect(host="localhost", port=8080, user="testuser")
cur = conn.cursor()

# Execute same query with multiple parameter sets
users_data = [
    ["Alice", 28],
    ["Bob", 32],
    ["Charlie", 24]
]

cur.executemany("INSERT INTO users VALUES (?, ?)", users_data)

Column Metadata Access

from trino.dbapi import connect

conn = connect(host="localhost", port=8080, user="testuser")
cur = conn.cursor()
cur.execute("SELECT * FROM users LIMIT 0")  # Get structure without data

# Access column information
for col in cur.description:
    print(f"Column: {col.name}, Type: {col.type_code}")

Install with Tessl CLI

npx tessl i tessl/pypi-trino

docs

authentication.md

db-api.md

index.md

low-level-client.md

sqlalchemy.md

tile.json