CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-trino

Client for the Trino distributed SQL Engine with DB-API 2.0 support, low-level client interface, and SQLAlchemy dialect.

Overview
Eval results
Files

sqlalchemy.mddocs/

SQLAlchemy Integration

SQLAlchemy dialect implementation enabling ORM usage, connection pooling, and SQL expression language support with Trino. Provides seamless integration with existing SQLAlchemy applications and frameworks.

Capabilities

Engine Creation

Standard SQLAlchemy engine creation with Trino-specific URL schemes and connection parameters.

def create_engine(url: str, **kwargs) -> Engine
"""
Create SQLAlchemy engine for Trino connections.

Parameters:
- url: Connection URL in format 'trino://user:password@host:port/catalog/schema'
- **kwargs: Additional engine parameters including connect_args

Returns:
SQLAlchemy Engine instance
"""

URL Factory

Utility function for programmatically constructing Trino connection URLs with proper parameter encoding and validation.

def URL(
    host: str,
    port: Optional[int] = 8080,
    user: Optional[str] = None,
    password: Optional[str] = None,
    catalog: Optional[str] = None,
    schema: Optional[str] = None,
    source: Optional[str] = "trino-sqlalchemy",
    session_properties: Dict[str, str] = None,
    http_headers: Dict[str, Union[str, int]] = None,
    extra_credential: Optional[List[Tuple[str, str]]] = None,
    client_tags: Optional[List[str]] = None,
    legacy_primitive_types: Optional[bool] = None,
    legacy_prepared_statements: Optional[bool] = None,
    access_token: Optional[str] = None,
    cert: Optional[str] = None,
    key: Optional[str] = None,
    verify: Optional[bool] = None,
    roles: Optional[Dict[str, str]] = None
) -> str
"""
Create properly encoded Trino SQLAlchemy connection URL.

Parameters:
- host: Trino coordinator hostname (required)
- port: TCP port (default: 8080, required)
- user: Username for connection
- password: Password for basic authentication (requires user)
- catalog: Default catalog
- schema: Default schema (requires catalog)
- source: Client source identifier
- session_properties: Session configuration properties
- http_headers: Additional HTTP headers
- extra_credential: Extra credential key-value pairs
- client_tags: Client tags for query identification
- legacy_primitive_types: Use string representations for edge cases
- legacy_prepared_statements: Force legacy prepared statement protocol
- access_token: JWT token for authentication
- cert: Path to client certificate file
- key: Path to private key file
- verify: SSL certificate verification setting
- roles: Authorization roles per catalog

Returns:
Properly encoded connection URL string
"""

Dialect Registration

The Trino dialect is automatically registered with SQLAlchemy under the "trino" scheme.

# Dialect automatically registered as:
# "trino" -> "trino.sqlalchemy.dialect:TrinoDialect"

Connection URL Formats

Basic Connection

# Minimal connection
'trino://user@host:port/catalog'

# With schema
'trino://user@host:port/catalog/schema'

# With password (Basic auth)
'trino://user:password@host:port/catalog/schema'

URL Parameters

Connection parameters can be passed via URL query string:

# Session properties
'trino://user@host:port/catalog?session_properties={"query_max_run_time": "1h"}'

# Client tags
'trino://user@host:port/catalog?client_tags=["tag1", "tag2"]'

# Roles
'trino://user@host:port/catalog?roles={"catalog1": "role1"}'

# JWT token
'trino://user@host:port/catalog?access_token=jwt_token_here'

# Certificate authentication
'trino://user@host:port/catalog?cert=/path/to/cert.pem&key=/path/to/key.pem'

Usage Examples

Basic Engine Creation

from sqlalchemy import create_engine

# Simple connection
engine = create_engine('trino://testuser@localhost:8080/memory')

# With schema
engine = create_engine('trino://testuser@localhost:8080/memory/default')

# HTTPS connection
engine = create_engine('trino://testuser@trino.example.com:443/hive')

Authentication Examples

from sqlalchemy import create_engine
from trino.auth import BasicAuthentication, JWTAuthentication, OAuth2Authentication

# Basic authentication via URL
engine = create_engine('trino://alice:password@trino.example.com:443/hive')

# Basic authentication via connect_args
engine = create_engine(
    'trino://alice@trino.example.com:443/hive',
    connect_args={
        "auth": BasicAuthentication("alice", "password"),
        "http_scheme": "https"
    }
)

# JWT authentication
engine = create_engine(
    'trino://alice@trino.example.com:443/hive',
    connect_args={
        "auth": JWTAuthentication("jwt_token_here"),
        "http_scheme": "https"
    }
)

# OAuth2 authentication
engine = create_engine(
    'trino://alice@trino.example.com:443/hive',
    connect_args={
        "auth": OAuth2Authentication(),
        "http_scheme": "https"
    }
)

URL Factory Usage

from trino.sqlalchemy import URL
from sqlalchemy import create_engine

# Programmatic URL construction
url = URL(
    host="trino.example.com",
    port=443,
    user="alice",
    catalog="hive",
    schema="warehouse",
    client_tags=["analytics", "prod"]
)

engine = create_engine(url)

Connection Configuration

from sqlalchemy import create_engine

# Session properties and client configuration
engine = create_engine(
    'trino://alice@trino.example.com:443/hive',
    connect_args={
        "session_properties": {
            "query_max_run_time": "1h",
            "join_distribution_type": "BROADCAST"
        },
        "client_tags": ["analytics", "dashboard"],
        "roles": {"hive": "admin", "system": "reader"},
        "timezone": "America/New_York",
        "http_scheme": "https"
    }
)

Basic Query Execution

from sqlalchemy import create_engine, text

engine = create_engine('trino://testuser@localhost:8080/memory')

# Text query execution
with engine.connect() as connection:
    result = connection.execute(text("SELECT * FROM system.runtime.nodes"))
    for row in result:
        print(dict(row))

Table Reflection

from sqlalchemy import create_engine, MetaData, Table

engine = create_engine('trino://testuser@localhost:8080/memory')
metadata = MetaData()

# Reflect table structure
users_table = Table(
    'users',
    metadata,
    schema='default',
    autoload_with=engine
)

# Access column information
for column in users_table.columns:
    print(f"Column: {column.name}, Type: {column.type}")

SQL Expression Language

from sqlalchemy import create_engine, MetaData, Table, select

engine = create_engine('trino://testuser@localhost:8080/memory')
metadata = MetaData()

users = Table('users', metadata, schema='default', autoload_with=engine)

# Build and execute query
query = select(users.c.name, users.c.age).where(users.c.age > 25)

with engine.connect() as connection:
    result = connection.execute(query)
    for row in result:
        print(f"Name: {row.name}, Age: {row.age}")

ORM Usage

from sqlalchemy import create_engine, Column, Integer, String
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker

engine = create_engine('trino://testuser@localhost:8080/memory')
Base = declarative_base()

class User(Base):
    __tablename__ = 'users'
    __table_args__ = {'schema': 'default'}
    
    id = Column(Integer, primary_key=True)
    name = Column(String)
    age = Column(Integer)

Session = sessionmaker(bind=engine)
session = Session()

# Query using ORM
users = session.query(User).filter(User.age > 25).all()
for user in users:
    print(f"Name: {user.name}, Age: {user.age}")

Connection Pooling

from sqlalchemy import create_engine
from sqlalchemy.pool import QueuePool

# Custom connection pool configuration
engine = create_engine(
    'trino://testuser@localhost:8080/memory',
    poolclass=QueuePool,
    pool_size=10,
    max_overflow=20,
    pool_timeout=30,
    pool_recycle=3600
)

Transaction Support

from sqlalchemy import create_engine, text
from trino.transaction import IsolationLevel

# Engine with transaction support
engine = create_engine(
    'trino://testuser@localhost:8080/memory',
    connect_args={
        "isolation_level": IsolationLevel.READ_COMMITTED
    }
)

# Explicit transaction
with engine.begin() as connection:
    connection.execute(text("INSERT INTO users VALUES (1, 'Alice', 30)"))
    connection.execute(text("INSERT INTO users VALUES (2, 'Bob', 25)"))
    # Auto-commit on success, rollback on exception

SSL Configuration

from sqlalchemy import create_engine

# SSL verification disabled (development only)
engine = create_engine(
    'trino://testuser@trino.example.com:443/hive',
    connect_args={
        "http_scheme": "https",
        "verify": False
    }
)

# Custom CA bundle
engine = create_engine(
    'trino://testuser@trino.example.com:443/hive',
    connect_args={
        "http_scheme": "https", 
        "verify": "/path/to/ca-bundle.crt"
    }
)

Custom HTTP Session

import requests
from sqlalchemy import create_engine

# Custom requests session
session = requests.Session()
session.headers.update({"User-Agent": "MyApp/1.0"})
session.timeout = 60

engine = create_engine(
    'trino://testuser@localhost:8080/memory',
    connect_args={
        "http_session": session
    }
)

Error Handling

from sqlalchemy import create_engine, text
from sqlalchemy.exc import SQLAlchemyError
from trino.exceptions import TrinoQueryError, TrinoUserError

engine = create_engine('trino://testuser@localhost:8080/memory')

try:
    with engine.connect() as connection:
        result = connection.execute(text("SELECT * FROM nonexistent_table"))
        rows = result.fetchall()
except TrinoUserError as e:
    print(f"Trino user error: {e.message}")
    print(f"Query ID: {e.query_id}")
except TrinoQueryError as e:
    print(f"Trino query error: {e.error_name}")
    print(f"Error code: {e.error_code}")
except SQLAlchemyError as e:
    print(f"SQLAlchemy error: {e}")

Install with Tessl CLI

npx tessl i tessl/pypi-trino

docs

authentication.md

db-api.md

index.md

low-level-client.md

sqlalchemy.md

tile.json