Client for the Trino distributed SQL Engine with DB-API 2.0 support, low-level client interface, and SQLAlchemy dialect.
Comprehensive authentication support for Trino connections including Basic HTTP authentication, JWT tokens, OAuth2 flows, Kerberos/GSSAPI, and client certificates. All authentication mechanisms integrate seamlessly with both DB-API and low-level client interfaces.
Abstract interface for all authentication implementations, providing HTTP session configuration and exception handling.
class Authentication:
def set_http_session(self, http_session: Session) -> Session
"""
Configure HTTP session with authentication mechanism.
Parameters:
- http_session: requests.Session to configure
Returns:
Configured session instance
"""
def get_exceptions(self) -> Tuple[Any, ...]
"""
Return tuple of exception types that should trigger retries.
Returns:
Tuple of exception classes
"""HTTP Basic authentication using username and password credentials, suitable for development and testing environments.
class BasicAuthentication(Authentication):
def __init__(self, username: str, password: str)
"""
Initialize Basic authentication.
Parameters:
- username: Username for authentication
- password: Password for authentication
"""
def set_http_session(self, http_session: Session) -> Session
"""Configure session with HTTP Basic auth."""
def get_exceptions(self) -> Tuple[Any, ...]
"""Returns empty tuple (no specific exceptions)."""Bearer token authentication using JSON Web Tokens, commonly used with external identity providers and service accounts.
class JWTAuthentication(Authentication):
def __init__(self, token: str)
"""
Initialize JWT authentication.
Parameters:
- token: JWT token string
"""
def set_http_session(self, http_session: Session) -> Session
"""Configure session with Bearer token authentication."""
def get_exceptions(self) -> Tuple[Any, ...]
"""Returns empty tuple (no specific exceptions)."""Full OAuth2 flow implementation with configurable redirect handlers and automatic token caching via keyring or in-memory storage.
class OAuth2Authentication(Authentication):
def __init__(
self,
redirect_auth_url_handler: CompositeRedirectHandler = CompositeRedirectHandler([
WebBrowserRedirectHandler(),
ConsoleRedirectHandler()
])
)
"""
Initialize OAuth2 authentication with redirect handling.
Parameters:
- redirect_auth_url_handler: Handler for OAuth2 redirect URLs
"""
def set_http_session(self, http_session: Session) -> Session
"""Configure session with OAuth2 token bearer authentication."""
def get_exceptions(self) -> Tuple[Any, ...]
"""Returns empty tuple (no specific exceptions)."""Configurable handlers for OAuth2 authentication redirects supporting browser automation and console output.
class RedirectHandler:
"""Abstract base class for OAuth2 redirect handlers."""
def __call__(self, url: str) -> None
"""Handle redirect URL."""
class ConsoleRedirectHandler(RedirectHandler):
"""Print OAuth2 URL to console for manual browser navigation."""
def __call__(self, url: str) -> None
"""Print authentication URL to console."""
class WebBrowserRedirectHandler(RedirectHandler):
"""Automatically open OAuth2 URL in system browser."""
def __call__(self, url: str) -> None
"""Open URL in default web browser."""
class CompositeRedirectHandler(RedirectHandler):
"""Combine multiple redirect handlers."""
def __init__(self, handlers: List[Callable[[str], None]])
def __call__(self, url: str) -> None
"""Execute all configured handlers in sequence."""Kerberos authentication using requests-kerberos library with comprehensive configuration options for enterprise environments.
class KerberosAuthentication(Authentication):
# Class constants for mutual authentication
MUTUAL_REQUIRED = 1
MUTUAL_OPTIONAL = 2
MUTUAL_DISABLED = 3
def __init__(
self,
config: Optional[str] = None,
service_name: Optional[str] = None,
mutual_authentication: int = MUTUAL_REQUIRED,
force_preemptive: bool = False,
hostname_override: Optional[str] = None,
sanitize_mutual_error_response: bool = True,
principal: Optional[str] = None,
delegate: bool = False,
ca_bundle: Optional[str] = None
)
"""
Initialize Kerberos authentication.
Parameters:
- config: Path to Kerberos configuration file
- service_name: Service name for authentication
- mutual_authentication: Mutual authentication requirement level
- force_preemptive: Force preemptive authentication
- hostname_override: Override hostname for service principal
- sanitize_mutual_error_response: Sanitize error responses
- principal: Kerberos principal name
- delegate: Enable credential delegation
- ca_bundle: Path to CA certificate bundle
"""
def set_http_session(self, http_session: Session) -> Session
"""Configure session with Kerberos authentication."""
def get_exceptions(self) -> Tuple[Any, ...]
"""Returns KerberosExchangeError tuple."""GSSAPI authentication using requests-gssapi library, providing an alternative to Kerberos with potentially better PyPy compatibility.
class GSSAPIAuthentication(Authentication):
# Class constants for mutual authentication
MUTUAL_REQUIRED = 1
MUTUAL_OPTIONAL = 2
MUTUAL_DISABLED = 3
def __init__(
self,
config: Optional[str] = None,
service_name: Optional[str] = None,
mutual_authentication: int = MUTUAL_DISABLED,
force_preemptive: bool = False,
hostname_override: Optional[str] = None,
sanitize_mutual_error_response: bool = True,
principal: Optional[str] = None,
delegate: bool = False,
ca_bundle: Optional[str] = None
)
"""
Initialize GSSAPI authentication.
Parameters:
- config: Path to Kerberos configuration file
- service_name: Service name for authentication
- mutual_authentication: Mutual authentication requirement level
- force_preemptive: Force opportunistic authentication
- hostname_override: Override hostname for service principal
- sanitize_mutual_error_response: Sanitize error responses
- principal: Principal name for credential acquisition
- delegate: Enable credential delegation
- ca_bundle: Path to CA certificate bundle
"""
def set_http_session(self, http_session: Session) -> Session
"""Configure session with GSSAPI authentication."""
def get_exceptions(self) -> Tuple[Any, ...]
"""Returns SPNEGOExchangeError tuple."""Client certificate authentication using X.509 certificates for mutual TLS authentication.
class CertificateAuthentication(Authentication):
def __init__(self, cert: str, key: str)
"""
Initialize certificate authentication.
Parameters:
- cert: Path to client certificate file (PEM format)
- key: Path to private key file (PEM format)
"""
def set_http_session(self, http_session: Session) -> Session
"""Configure session with client certificate."""
def get_exceptions(self) -> Tuple[Any, ...]
"""Returns empty tuple (no specific exceptions)."""from trino.dbapi import connect
from trino.auth import BasicAuthentication
conn = connect(
host="trino.example.com",
port=443,
user="alice",
auth=BasicAuthentication("alice", "secret_password"),
http_scheme="https"
)from trino.dbapi import connect
from trino.auth import JWTAuthentication
# JWT token from external identity provider
jwt_token = "eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCJ9..."
conn = connect(
host="trino.example.com",
port=443,
user="service-account",
auth=JWTAuthentication(jwt_token),
http_scheme="https"
)from trino.dbapi import connect
from trino.auth import OAuth2Authentication, CompositeRedirectHandler, ConsoleRedirectHandler
# Default behavior: open browser and print to console
conn = connect(
host="trino.example.com",
port=443,
user="alice",
auth=OAuth2Authentication(),
http_scheme="https"
)
# Console-only redirect (for headless environments)
console_auth = OAuth2Authentication(
redirect_auth_url_handler=CompositeRedirectHandler([ConsoleRedirectHandler()])
)
conn = connect(
host="trino.example.com",
port=443,
user="alice",
auth=console_auth,
http_scheme="https"
)from trino.dbapi import connect
from trino.auth import KerberosAuthentication
# Basic Kerberos setup
auth = KerberosAuthentication()
conn = connect(
host="trino.example.com",
port=443,
user="alice@EXAMPLE.COM",
auth=auth,
http_scheme="https"
)
# Advanced Kerberos configuration
auth = KerberosAuthentication(
config="/etc/krb5.conf",
service_name="HTTP",
mutual_authentication=KerberosAuthentication.MUTUAL_REQUIRED,
hostname_override="trino.example.com",
principal="alice@EXAMPLE.COM",
delegate=True
)
conn = connect(
host="trino.example.com",
port=443,
user="alice@EXAMPLE.COM",
auth=auth,
http_scheme="https"
)from trino.dbapi import connect
from trino.auth import GSSAPIAuthentication
# GSSAPI with service name and hostname
auth = GSSAPIAuthentication(
service_name="HTTP",
hostname_override="trino.example.com",
principal="alice@EXAMPLE.COM"
)
conn = connect(
host="trino.example.com",
port=443,
user="alice@EXAMPLE.COM",
auth=auth,
http_scheme="https"
)from trino.dbapi import connect
from trino.auth import CertificateAuthentication
auth = CertificateAuthentication(
cert="/path/to/client.crt",
key="/path/to/client.key"
)
conn = connect(
host="trino.example.com",
port=443,
user="alice",
auth=auth,
http_scheme="https"
)from sqlalchemy import create_engine
from trino.auth import BasicAuthentication
# Basic auth via connection string
engine = create_engine('trino://alice:password@trino.example.com:443/catalog')
# Authentication via connect_args
engine = create_engine(
'trino://alice@trino.example.com:443/catalog',
connect_args={
"auth": BasicAuthentication("alice", "password"),
"http_scheme": "https"
}
)from trino.dbapi import connect
from trino.auth import BasicAuthentication
# Connect as service account but execute queries as different user
conn = connect(
host="trino.example.com",
port=443,
user="alice", # User for query execution and authorization
auth=BasicAuthentication("service", "service_password"), # Authentication credentials
http_scheme="https"
)from trino.dbapi import connect
# Pass additional credentials for connectors
conn = connect(
host="trino.example.com",
port=443,
user="alice",
extra_credential=[
("custom.username", "connector_user"),
("custom.password", "connector_password")
]
)Install with Tessl CLI
npx tessl i tessl/pypi-trino