Client for the Trino distributed SQL Engine with DB-API 2.0 support, low-level client interface, and SQLAlchemy dialect.
Python Database API 2.0 compliant interface providing standard database connectivity patterns. This is the recommended interface for most applications as it follows established Python database standards and integrates seamlessly with existing database tooling.
Creates database connections with comprehensive configuration options including authentication, session properties, timeouts, and protocol settings.
def connect(
host: str,
port: int = None,
user: str = None,
source: str = "trino-python-client",
catalog: str = None,
schema: str = None,
session_properties: Dict[str, str] = None,
http_headers: Dict[str, str] = None,
http_scheme: str = None,
auth: Authentication = None,
extra_credential: List[Tuple[str, str]] = None,
max_attempts: int = 3,
request_timeout: float = 30.0,
isolation_level: IsolationLevel = IsolationLevel.AUTOCOMMIT,
verify: bool = True,
http_session: Session = None,
client_tags: List[str] = None,
legacy_primitive_types: bool = False,
legacy_prepared_statements: bool = None,
roles: Union[Dict[str, str], str] = None,
timezone: str = None,
encoding: Union[str, List[str]] = None
) -> ConnectionParameters:
host: Trino coordinator hostname or URLport: TCP port (default: 8080 for HTTP, 443 for HTTPS)user: Username for authentication and query attributionsource: Query source identifier for trackingcatalog: Default catalog for queriesschema: Default schema for queriessession_properties: Trino session configuration propertieshttp_headers: Additional HTTP headershttp_scheme: "http" or "https" (auto-detected from URL or port)auth: Authentication instance (BasicAuthentication, JWTAuthentication, etc.)extra_credential: Additional credentials as key-value pairsmax_attempts: Maximum retry attempts for failed requestsrequest_timeout: Request timeout in secondsisolation_level: Transaction isolation levelverify: SSL certificate verification (boolean or path to CA bundle)http_session: Custom requests.Session instanceclient_tags: Tags for query identificationlegacy_primitive_types: Return string representations for edge-case valueslegacy_prepared_statements: Force legacy prepared statement protocolroles: Authorization roles for catalogstimezone: Session timezone (IANA timezone name)encoding: Spooled protocol encoding preferenceDatabase connection object providing transaction control, cursor creation, and resource management with context manager support.
class Connection:
def __init__(
self,
host: str,
port: int = None,
user: str = None,
**kwargs
)
def cursor(
self,
cursor_style: str = "row",
legacy_primitive_types: bool = None
) -> Cursor
"""
Create a new cursor for executing queries.
Parameters:
- cursor_style: "row" for regular cursor, "segment" for spooled segments
- legacy_primitive_types: Override connection-level setting
Returns:
Cursor instance for query execution
"""
def commit(self) -> None
"""Commit the current transaction."""
def rollback(self) -> None
"""Rollback the current transaction."""
def close(self) -> None
"""Close the connection and release resources."""
def start_transaction(self) -> Transaction
"""Start a new transaction and return transaction object."""
@property
def isolation_level(self) -> IsolationLevel
"""Current transaction isolation level."""
@property
def transaction(self) -> Optional[Transaction]
"""Current transaction object if active."""
# Context manager support
def __enter__(self) -> Connection
def __exit__(self, exc_type, exc_value, traceback) -> NoneCursor object for executing SQL statements with support for parameterized queries, batch execution, and comprehensive result retrieval.
class Cursor:
def __init__(
self,
connection: Connection,
request: TrinoRequest,
legacy_primitive_types: bool = False
)
def execute(self, operation: str, params: List[Any] = None) -> Cursor
"""
Execute a SQL query with optional parameters.
Parameters:
- operation: SQL statement to execute
- params: Query parameters for prepared statement execution
Returns:
Self for method chaining
"""
def executemany(self, operation: str, seq_of_params: List[List[Any]]) -> Cursor
"""
Execute SQL query against multiple parameter sets.
Parameters:
- operation: SQL statement to execute
- seq_of_params: Sequence of parameter lists
Returns:
Self for method chaining
"""
def fetchone(self) -> Optional[List[Any]]
"""Fetch next row or None if no more data."""
def fetchmany(self, size: int = None) -> List[List[Any]]
"""
Fetch multiple rows.
Parameters:
- size: Number of rows to fetch (defaults to arraysize)
Returns:
List of rows (each row is a list of column values)
"""
def fetchall(self) -> List[List[Any]]
"""Fetch all remaining rows."""
def describe(self, sql: str) -> List[DescribeOutput]
"""
Get column information for a SQL statement without executing it.
Parameters:
- sql: SQL statement to analyze
Returns:
List of DescribeOutput objects with column metadata
"""
def cancel(self) -> None
"""Cancel the currently executing query."""
def close(self) -> None
"""Close the cursor and cancel any running queries."""
@property
def description(self) -> List[ColumnDescription]
"""Column descriptions for the last executed query."""
@property
def rowcount(self) -> int
"""Number of rows affected by last query (-1 if unknown)."""
@property
def arraysize(self) -> int
"""Default number of rows for fetchmany()."""
@arraysize.setter
def arraysize(self, size: int) -> None
@property
def query_id(self) -> Optional[str]
"""Trino query ID for the last executed query."""
@property
def query(self) -> Optional[str]
"""SQL text of the last executed query."""
@property
def stats(self) -> Optional[Dict[str, Any]]
"""Query execution statistics."""
@property
def warnings(self) -> Optional[List[Dict[str, Any]]]
"""Query execution warnings."""
@property
def info_uri(self) -> Optional[str]
"""URI for detailed query information."""
@property
def update_type(self) -> Optional[str]
"""Type of update operation (INSERT, UPDATE, DELETE, etc.)."""
# Context manager support
def __enter__(self) -> Cursor
def __exit__(self, exc_type, exc_value, traceback) -> NoneSpecialized cursor for handling spooled protocol segments, providing direct access to compressed data segments for high-performance scenarios.
class SegmentCursor(Cursor):
def __init__(
self,
connection: Connection,
request: TrinoRequest,
legacy_primitive_types: bool = False
)
def execute(self, operation: str, params: List[Any] = None) -> SegmentCursor
"""
Execute query and return segments instead of mapped rows.
Note: params are not supported for SegmentCursor.
"""Data structures for query and column metadata with rich type information and result descriptions.
class ColumnDescription:
"""Column metadata following DB-API 2.0 specification."""
name: str
type_code: int
display_size: int
internal_size: int
precision: int
scale: int
null_ok: bool
@classmethod
def from_column(cls, column: Dict[str, Any]) -> ColumnDescription
class DescribeOutput:
"""Output column information from DESCRIBE statement."""
name: str
catalog: str
schema: str
table: str
type: str
type_size: int
aliased: bool
@classmethod
def from_row(cls, row: List[Any]) -> DescribeOutputDB-API 2.0 compliance information and global configuration.
apilevel: str = "2.0"
threadsafety: int = 2
paramstyle: str = "qmark"Type classification objects for result column type checking following DB-API 2.0 specification.
class DBAPITypeObject:
def __init__(self, *values: str)
def __eq__(self, other: str) -> bool
# Type categories
STRING: DBAPITypeObject # VARCHAR, CHAR, VARBINARY, JSON, IPADDRESS
BINARY: DBAPITypeObject # ARRAY, MAP, ROW, HyperLogLog, P4HyperLogLog, QDigest
NUMBER: DBAPITypeObject # BOOLEAN, TINYINT, SMALLINT, INTEGER, BIGINT, REAL, DOUBLE, DECIMAL
DATETIME: DBAPITypeObject # DATE, TIME, TIMESTAMP, INTERVAL types
ROWID: DBAPITypeObject # (empty - Trino has no row IDs)DB-API 2.0 date and time constructor functions for creating temporal values.
def DateFromTicks(ticks: float) -> date
"""Create date from Unix timestamp."""
def TimestampFromTicks(ticks: float) -> datetime
"""Create datetime from Unix timestamp."""
def TimeFromTicks(ticks: float) -> time
"""Create time from Unix timestamp."""
def Binary(string: str) -> bytes
"""Convert string to binary data."""
# Type aliases
Date = date
Time = time
Timestamp = datetimefrom trino.dbapi import connect
conn = connect(
host="localhost",
port=8080,
user="testuser",
catalog="memory",
schema="default"
)
cur = conn.cursor()
cur.execute("SELECT name, age FROM users WHERE age > ?", [25])
# Fetch results
for row in cur:
print(f"Name: {row[0]}, Age: {row[1]}")
cur.close()
conn.close()from trino.dbapi import connect
from trino.transaction import IsolationLevel
conn = connect(
host="localhost",
port=8080,
user="testuser",
catalog="memory",
schema="default",
isolation_level=IsolationLevel.READ_COMMITTED
)
with conn: # Auto-commit on success, rollback on exception
cur = conn.cursor()
cur.execute("INSERT INTO users VALUES (?, ?)", ["John", 30])
cur.execute("INSERT INTO users VALUES (?, ?)", ["Jane", 25])from trino.dbapi import connect
conn = connect(host="localhost", port=8080, user="testuser")
cur = conn.cursor()
# Execute same query with multiple parameter sets
users_data = [
["Alice", 28],
["Bob", 32],
["Charlie", 24]
]
cur.executemany("INSERT INTO users VALUES (?, ?)", users_data)from trino.dbapi import connect
conn = connect(host="localhost", port=8080, user="testuser")
cur = conn.cursor()
cur.execute("SELECT * FROM users LIMIT 0") # Get structure without data
# Access column information
for col in cur.description:
print(f"Column: {col.name}, Type: {col.type_code}")Install with Tessl CLI
npx tessl i tessl/pypi-trino