0
# SQLAlchemy Integration
1
2
SQLAlchemy dialect implementation enabling ORM usage, connection pooling, and SQL expression language support with Trino. Provides seamless integration with existing SQLAlchemy applications and frameworks.
3
4
## Capabilities
5
6
### Engine Creation
7
8
Standard SQLAlchemy engine creation with Trino-specific URL schemes and connection parameters.
9
10
```python { .api }
11
def create_engine(url: str, **kwargs) -> Engine
12
"""
13
Create SQLAlchemy engine for Trino connections.
14
15
Parameters:
16
- url: Connection URL in format 'trino://user:password@host:port/catalog/schema'
17
- **kwargs: Additional engine parameters including connect_args
18
19
Returns:
20
SQLAlchemy Engine instance
21
"""
22
```
23
24
### URL Factory
25
26
Utility function for programmatically constructing Trino connection URLs with proper parameter encoding and validation.
27
28
```python { .api }
29
def URL(
30
host: str,
31
port: Optional[int] = 8080,
32
user: Optional[str] = None,
33
password: Optional[str] = None,
34
catalog: Optional[str] = None,
35
schema: Optional[str] = None,
36
source: Optional[str] = "trino-sqlalchemy",
37
session_properties: Dict[str, str] = None,
38
http_headers: Dict[str, Union[str, int]] = None,
39
extra_credential: Optional[List[Tuple[str, str]]] = None,
40
client_tags: Optional[List[str]] = None,
41
legacy_primitive_types: Optional[bool] = None,
42
legacy_prepared_statements: Optional[bool] = None,
43
access_token: Optional[str] = None,
44
cert: Optional[str] = None,
45
key: Optional[str] = None,
46
verify: Optional[bool] = None,
47
roles: Optional[Dict[str, str]] = None
48
) -> str
49
"""
50
Create properly encoded Trino SQLAlchemy connection URL.
51
52
Parameters:
53
- host: Trino coordinator hostname (required)
54
- port: TCP port (default: 8080, required)
55
- user: Username for connection
56
- password: Password for basic authentication (requires user)
57
- catalog: Default catalog
58
- schema: Default schema (requires catalog)
59
- source: Client source identifier
60
- session_properties: Session configuration properties
61
- http_headers: Additional HTTP headers
62
- extra_credential: Extra credential key-value pairs
63
- client_tags: Client tags for query identification
64
- legacy_primitive_types: Use string representations for edge cases
65
- legacy_prepared_statements: Force legacy prepared statement protocol
66
- access_token: JWT token for authentication
67
- cert: Path to client certificate file
68
- key: Path to private key file
69
- verify: SSL certificate verification setting
70
- roles: Authorization roles per catalog
71
72
Returns:
73
Properly encoded connection URL string
74
"""
75
```
76
77
### Dialect Registration
78
79
The Trino dialect is automatically registered with SQLAlchemy under the "trino" scheme.
80
81
```python { .api }
82
# Dialect automatically registered as:
83
# "trino" -> "trino.sqlalchemy.dialect:TrinoDialect"
84
```
85
86
## Connection URL Formats
87
88
### Basic Connection
89
90
```python
91
# Minimal connection
92
'trino://user@host:port/catalog'
93
94
# With schema
95
'trino://user@host:port/catalog/schema'
96
97
# With password (Basic auth)
98
'trino://user:password@host:port/catalog/schema'
99
```
100
101
### URL Parameters
102
103
Connection parameters can be passed via URL query string:
104
105
```python
106
# Session properties
107
'trino://user@host:port/catalog?session_properties={"query_max_run_time": "1h"}'
108
109
# Client tags
110
'trino://user@host:port/catalog?client_tags=["tag1", "tag2"]'
111
112
# Roles
113
'trino://user@host:port/catalog?roles={"catalog1": "role1"}'
114
115
# JWT token
116
'trino://user@host:port/catalog?access_token=jwt_token_here'
117
118
# Certificate authentication
119
'trino://user@host:port/catalog?cert=/path/to/cert.pem&key=/path/to/key.pem'
120
```
121
122
## Usage Examples
123
124
### Basic Engine Creation
125
126
```python
127
from sqlalchemy import create_engine
128
129
# Simple connection
130
engine = create_engine('trino://testuser@localhost:8080/memory')
131
132
# With schema
133
engine = create_engine('trino://testuser@localhost:8080/memory/default')
134
135
# HTTPS connection
136
engine = create_engine('trino://testuser@trino.example.com:443/hive')
137
```
138
139
### Authentication Examples
140
141
```python
142
from sqlalchemy import create_engine
143
from trino.auth import BasicAuthentication, JWTAuthentication, OAuth2Authentication
144
145
# Basic authentication via URL
146
engine = create_engine('trino://alice:password@trino.example.com:443/hive')
147
148
# Basic authentication via connect_args
149
engine = create_engine(
150
'trino://alice@trino.example.com:443/hive',
151
connect_args={
152
"auth": BasicAuthentication("alice", "password"),
153
"http_scheme": "https"
154
}
155
)
156
157
# JWT authentication
158
engine = create_engine(
159
'trino://alice@trino.example.com:443/hive',
160
connect_args={
161
"auth": JWTAuthentication("jwt_token_here"),
162
"http_scheme": "https"
163
}
164
)
165
166
# OAuth2 authentication
167
engine = create_engine(
168
'trino://alice@trino.example.com:443/hive',
169
connect_args={
170
"auth": OAuth2Authentication(),
171
"http_scheme": "https"
172
}
173
)
174
```
175
176
### URL Factory Usage
177
178
```python
179
from trino.sqlalchemy import URL
180
from sqlalchemy import create_engine
181
182
# Programmatic URL construction
183
url = URL(
184
host="trino.example.com",
185
port=443,
186
user="alice",
187
catalog="hive",
188
schema="warehouse",
189
client_tags=["analytics", "prod"]
190
)
191
192
engine = create_engine(url)
193
```
194
195
### Connection Configuration
196
197
```python
198
from sqlalchemy import create_engine
199
200
# Session properties and client configuration
201
engine = create_engine(
202
'trino://alice@trino.example.com:443/hive',
203
connect_args={
204
"session_properties": {
205
"query_max_run_time": "1h",
206
"join_distribution_type": "BROADCAST"
207
},
208
"client_tags": ["analytics", "dashboard"],
209
"roles": {"hive": "admin", "system": "reader"},
210
"timezone": "America/New_York",
211
"http_scheme": "https"
212
}
213
)
214
```
215
216
### Basic Query Execution
217
218
```python
219
from sqlalchemy import create_engine, text
220
221
engine = create_engine('trino://testuser@localhost:8080/memory')
222
223
# Text query execution
224
with engine.connect() as connection:
225
result = connection.execute(text("SELECT * FROM system.runtime.nodes"))
226
for row in result:
227
print(dict(row))
228
```
229
230
### Table Reflection
231
232
```python
233
from sqlalchemy import create_engine, MetaData, Table
234
235
engine = create_engine('trino://testuser@localhost:8080/memory')
236
metadata = MetaData()
237
238
# Reflect table structure
239
users_table = Table(
240
'users',
241
metadata,
242
schema='default',
243
autoload_with=engine
244
)
245
246
# Access column information
247
for column in users_table.columns:
248
print(f"Column: {column.name}, Type: {column.type}")
249
```
250
251
### SQL Expression Language
252
253
```python
254
from sqlalchemy import create_engine, MetaData, Table, select
255
256
engine = create_engine('trino://testuser@localhost:8080/memory')
257
metadata = MetaData()
258
259
users = Table('users', metadata, schema='default', autoload_with=engine)
260
261
# Build and execute query
262
query = select(users.c.name, users.c.age).where(users.c.age > 25)
263
264
with engine.connect() as connection:
265
result = connection.execute(query)
266
for row in result:
267
print(f"Name: {row.name}, Age: {row.age}")
268
```
269
270
### ORM Usage
271
272
```python
273
from sqlalchemy import create_engine, Column, Integer, String
274
from sqlalchemy.ext.declarative import declarative_base
275
from sqlalchemy.orm import sessionmaker
276
277
engine = create_engine('trino://testuser@localhost:8080/memory')
278
Base = declarative_base()
279
280
class User(Base):
281
__tablename__ = 'users'
282
__table_args__ = {'schema': 'default'}
283
284
id = Column(Integer, primary_key=True)
285
name = Column(String)
286
age = Column(Integer)
287
288
Session = sessionmaker(bind=engine)
289
session = Session()
290
291
# Query using ORM
292
users = session.query(User).filter(User.age > 25).all()
293
for user in users:
294
print(f"Name: {user.name}, Age: {user.age}")
295
```
296
297
### Connection Pooling
298
299
```python
300
from sqlalchemy import create_engine
301
from sqlalchemy.pool import QueuePool
302
303
# Custom connection pool configuration
304
engine = create_engine(
305
'trino://testuser@localhost:8080/memory',
306
poolclass=QueuePool,
307
pool_size=10,
308
max_overflow=20,
309
pool_timeout=30,
310
pool_recycle=3600
311
)
312
```
313
314
### Transaction Support
315
316
```python
317
from sqlalchemy import create_engine, text
318
from trino.transaction import IsolationLevel
319
320
# Engine with transaction support
321
engine = create_engine(
322
'trino://testuser@localhost:8080/memory',
323
connect_args={
324
"isolation_level": IsolationLevel.READ_COMMITTED
325
}
326
)
327
328
# Explicit transaction
329
with engine.begin() as connection:
330
connection.execute(text("INSERT INTO users VALUES (1, 'Alice', 30)"))
331
connection.execute(text("INSERT INTO users VALUES (2, 'Bob', 25)"))
332
# Auto-commit on success, rollback on exception
333
```
334
335
### SSL Configuration
336
337
```python
338
from sqlalchemy import create_engine
339
340
# SSL verification disabled (development only)
341
engine = create_engine(
342
'trino://testuser@trino.example.com:443/hive',
343
connect_args={
344
"http_scheme": "https",
345
"verify": False
346
}
347
)
348
349
# Custom CA bundle
350
engine = create_engine(
351
'trino://testuser@trino.example.com:443/hive',
352
connect_args={
353
"http_scheme": "https",
354
"verify": "/path/to/ca-bundle.crt"
355
}
356
)
357
```
358
359
### Custom HTTP Session
360
361
```python
362
import requests
363
from sqlalchemy import create_engine
364
365
# Custom requests session
366
session = requests.Session()
367
session.headers.update({"User-Agent": "MyApp/1.0"})
368
session.timeout = 60
369
370
engine = create_engine(
371
'trino://testuser@localhost:8080/memory',
372
connect_args={
373
"http_session": session
374
}
375
)
376
```
377
378
### Error Handling
379
380
```python
381
from sqlalchemy import create_engine, text
382
from sqlalchemy.exc import SQLAlchemyError
383
from trino.exceptions import TrinoQueryError, TrinoUserError
384
385
engine = create_engine('trino://testuser@localhost:8080/memory')
386
387
try:
388
with engine.connect() as connection:
389
result = connection.execute(text("SELECT * FROM nonexistent_table"))
390
rows = result.fetchall()
391
except TrinoUserError as e:
392
print(f"Trino user error: {e.message}")
393
print(f"Query ID: {e.query_id}")
394
except TrinoQueryError as e:
395
print(f"Trino query error: {e.error_name}")
396
print(f"Error code: {e.error_code}")
397
except SQLAlchemyError as e:
398
print(f"SQLAlchemy error: {e}")
399
```