Client for the Trino distributed SQL Engine with DB-API 2.0 support, low-level client interface, and SQLAlchemy dialect.
SQLAlchemy dialect implementation enabling ORM usage, connection pooling, and SQL expression language support with Trino. Provides seamless integration with existing SQLAlchemy applications and frameworks.
Standard SQLAlchemy engine creation with Trino-specific URL schemes and connection parameters.
def create_engine(url: str, **kwargs) -> Engine
"""
Create SQLAlchemy engine for Trino connections.
Parameters:
- url: Connection URL in format 'trino://user:password@host:port/catalog/schema'
- **kwargs: Additional engine parameters including connect_args
Returns:
SQLAlchemy Engine instance
"""Utility function for programmatically constructing Trino connection URLs with proper parameter encoding and validation.
def URL(
host: str,
port: Optional[int] = 8080,
user: Optional[str] = None,
password: Optional[str] = None,
catalog: Optional[str] = None,
schema: Optional[str] = None,
source: Optional[str] = "trino-sqlalchemy",
session_properties: Dict[str, str] = None,
http_headers: Dict[str, Union[str, int]] = None,
extra_credential: Optional[List[Tuple[str, str]]] = None,
client_tags: Optional[List[str]] = None,
legacy_primitive_types: Optional[bool] = None,
legacy_prepared_statements: Optional[bool] = None,
access_token: Optional[str] = None,
cert: Optional[str] = None,
key: Optional[str] = None,
verify: Optional[bool] = None,
roles: Optional[Dict[str, str]] = None
) -> str
"""
Create properly encoded Trino SQLAlchemy connection URL.
Parameters:
- host: Trino coordinator hostname (required)
- port: TCP port (default: 8080, required)
- user: Username for connection
- password: Password for basic authentication (requires user)
- catalog: Default catalog
- schema: Default schema (requires catalog)
- source: Client source identifier
- session_properties: Session configuration properties
- http_headers: Additional HTTP headers
- extra_credential: Extra credential key-value pairs
- client_tags: Client tags for query identification
- legacy_primitive_types: Use string representations for edge cases
- legacy_prepared_statements: Force legacy prepared statement protocol
- access_token: JWT token for authentication
- cert: Path to client certificate file
- key: Path to private key file
- verify: SSL certificate verification setting
- roles: Authorization roles per catalog
Returns:
Properly encoded connection URL string
"""The Trino dialect is automatically registered with SQLAlchemy under the "trino" scheme.
# Dialect automatically registered as:
# "trino" -> "trino.sqlalchemy.dialect:TrinoDialect"# Minimal connection
'trino://user@host:port/catalog'
# With schema
'trino://user@host:port/catalog/schema'
# With password (Basic auth)
'trino://user:password@host:port/catalog/schema'Connection parameters can be passed via URL query string:
# Session properties
'trino://user@host:port/catalog?session_properties={"query_max_run_time": "1h"}'
# Client tags
'trino://user@host:port/catalog?client_tags=["tag1", "tag2"]'
# Roles
'trino://user@host:port/catalog?roles={"catalog1": "role1"}'
# JWT token
'trino://user@host:port/catalog?access_token=jwt_token_here'
# Certificate authentication
'trino://user@host:port/catalog?cert=/path/to/cert.pem&key=/path/to/key.pem'from sqlalchemy import create_engine
# Simple connection
engine = create_engine('trino://testuser@localhost:8080/memory')
# With schema
engine = create_engine('trino://testuser@localhost:8080/memory/default')
# HTTPS connection
engine = create_engine('trino://testuser@trino.example.com:443/hive')from sqlalchemy import create_engine
from trino.auth import BasicAuthentication, JWTAuthentication, OAuth2Authentication
# Basic authentication via URL
engine = create_engine('trino://alice:password@trino.example.com:443/hive')
# Basic authentication via connect_args
engine = create_engine(
'trino://alice@trino.example.com:443/hive',
connect_args={
"auth": BasicAuthentication("alice", "password"),
"http_scheme": "https"
}
)
# JWT authentication
engine = create_engine(
'trino://alice@trino.example.com:443/hive',
connect_args={
"auth": JWTAuthentication("jwt_token_here"),
"http_scheme": "https"
}
)
# OAuth2 authentication
engine = create_engine(
'trino://alice@trino.example.com:443/hive',
connect_args={
"auth": OAuth2Authentication(),
"http_scheme": "https"
}
)from trino.sqlalchemy import URL
from sqlalchemy import create_engine
# Programmatic URL construction
url = URL(
host="trino.example.com",
port=443,
user="alice",
catalog="hive",
schema="warehouse",
client_tags=["analytics", "prod"]
)
engine = create_engine(url)from sqlalchemy import create_engine
# Session properties and client configuration
engine = create_engine(
'trino://alice@trino.example.com:443/hive',
connect_args={
"session_properties": {
"query_max_run_time": "1h",
"join_distribution_type": "BROADCAST"
},
"client_tags": ["analytics", "dashboard"],
"roles": {"hive": "admin", "system": "reader"},
"timezone": "America/New_York",
"http_scheme": "https"
}
)from sqlalchemy import create_engine, text
engine = create_engine('trino://testuser@localhost:8080/memory')
# Text query execution
with engine.connect() as connection:
result = connection.execute(text("SELECT * FROM system.runtime.nodes"))
for row in result:
print(dict(row))from sqlalchemy import create_engine, MetaData, Table
engine = create_engine('trino://testuser@localhost:8080/memory')
metadata = MetaData()
# Reflect table structure
users_table = Table(
'users',
metadata,
schema='default',
autoload_with=engine
)
# Access column information
for column in users_table.columns:
print(f"Column: {column.name}, Type: {column.type}")from sqlalchemy import create_engine, MetaData, Table, select
engine = create_engine('trino://testuser@localhost:8080/memory')
metadata = MetaData()
users = Table('users', metadata, schema='default', autoload_with=engine)
# Build and execute query
query = select(users.c.name, users.c.age).where(users.c.age > 25)
with engine.connect() as connection:
result = connection.execute(query)
for row in result:
print(f"Name: {row.name}, Age: {row.age}")from sqlalchemy import create_engine, Column, Integer, String
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
engine = create_engine('trino://testuser@localhost:8080/memory')
Base = declarative_base()
class User(Base):
__tablename__ = 'users'
__table_args__ = {'schema': 'default'}
id = Column(Integer, primary_key=True)
name = Column(String)
age = Column(Integer)
Session = sessionmaker(bind=engine)
session = Session()
# Query using ORM
users = session.query(User).filter(User.age > 25).all()
for user in users:
print(f"Name: {user.name}, Age: {user.age}")from sqlalchemy import create_engine
from sqlalchemy.pool import QueuePool
# Custom connection pool configuration
engine = create_engine(
'trino://testuser@localhost:8080/memory',
poolclass=QueuePool,
pool_size=10,
max_overflow=20,
pool_timeout=30,
pool_recycle=3600
)from sqlalchemy import create_engine, text
from trino.transaction import IsolationLevel
# Engine with transaction support
engine = create_engine(
'trino://testuser@localhost:8080/memory',
connect_args={
"isolation_level": IsolationLevel.READ_COMMITTED
}
)
# Explicit transaction
with engine.begin() as connection:
connection.execute(text("INSERT INTO users VALUES (1, 'Alice', 30)"))
connection.execute(text("INSERT INTO users VALUES (2, 'Bob', 25)"))
# Auto-commit on success, rollback on exceptionfrom sqlalchemy import create_engine
# SSL verification disabled (development only)
engine = create_engine(
'trino://testuser@trino.example.com:443/hive',
connect_args={
"http_scheme": "https",
"verify": False
}
)
# Custom CA bundle
engine = create_engine(
'trino://testuser@trino.example.com:443/hive',
connect_args={
"http_scheme": "https",
"verify": "/path/to/ca-bundle.crt"
}
)import requests
from sqlalchemy import create_engine
# Custom requests session
session = requests.Session()
session.headers.update({"User-Agent": "MyApp/1.0"})
session.timeout = 60
engine = create_engine(
'trino://testuser@localhost:8080/memory',
connect_args={
"http_session": session
}
)from sqlalchemy import create_engine, text
from sqlalchemy.exc import SQLAlchemyError
from trino.exceptions import TrinoQueryError, TrinoUserError
engine = create_engine('trino://testuser@localhost:8080/memory')
try:
with engine.connect() as connection:
result = connection.execute(text("SELECT * FROM nonexistent_table"))
rows = result.fetchall()
except TrinoUserError as e:
print(f"Trino user error: {e.message}")
print(f"Query ID: {e.query_id}")
except TrinoQueryError as e:
print(f"Trino query error: {e.error_name}")
print(f"Error code: {e.error_code}")
except SQLAlchemyError as e:
print(f"SQLAlchemy error: {e}")Install with Tessl CLI
npx tessl i tessl/pypi-trino