0
# SQLAlchemy Integration
1
2
Partial SQLAlchemy dialect implementation supporting textual SQL execution and basic database operations within SQLAlchemy applications, providing integration with the SQLAlchemy ecosystem while maintaining Phoenix-specific functionality.
3
4
## Capabilities
5
6
### Phoenix Dialect
7
8
SQLAlchemy dialect for Phoenix database connectivity through phoenixdb.
9
10
```python { .api }
11
class PhoenixDialect(DefaultDialect):
12
"""
13
Phoenix dialect for SQLAlchemy.
14
15
Provides basic SQLAlchemy integration for Phoenix databases with support for
16
textual SQL execution and connection management.
17
"""
18
19
name = "phoenix"
20
"""Dialect name for SQLAlchemy registration."""
21
22
driver = "phoenixdb"
23
"""Driver name used by SQLAlchemy."""
24
```
25
26
### Engine Creation
27
28
```python { .api }
29
def create_engine(url, **kwargs):
30
"""
31
Creates SQLAlchemy engine for Phoenix connections.
32
33
URL format: phoenix://host:port[/path][?parameters]
34
35
Parameters:
36
- url (str): Connection URL in Phoenix format
37
- connect_args (dict): Additional phoenixdb.connect() parameters
38
- **kwargs: Standard SQLAlchemy engine parameters
39
40
Returns:
41
Engine: SQLAlchemy engine instance
42
"""
43
```
44
45
### Execution Context
46
47
```python { .api }
48
class PhoenixExecutionContext(DefaultExecutionContext):
49
"""
50
Phoenix-specific execution context for SQLAlchemy operations.
51
"""
52
53
def should_autocommit_text(self, statement):
54
"""
55
Determines if statement should be autocommitted.
56
57
Parameters:
58
- statement (str): SQL statement text
59
60
Returns:
61
bool: True if statement requires autocommit (DDL/DML operations)
62
"""
63
```
64
65
### DDL Compiler
66
67
```python { .api }
68
class PhoenixDDLCompiler(DDLCompiler):
69
"""
70
DDL compiler for Phoenix-specific SQL generation.
71
"""
72
73
def visit_primary_key_constraint(self, constraint):
74
"""
75
Compiles primary key constraints.
76
77
Parameters:
78
- constraint: SQLAlchemy PrimaryKeyConstraint
79
80
Returns:
81
str: Phoenix-compatible PRIMARY KEY clause
82
83
Raises:
84
CompileError: If constraint has no name (required by Phoenix)
85
"""
86
```
87
88
## Usage Examples
89
90
### Basic Engine Setup
91
92
```python
93
from sqlalchemy import create_engine
94
95
# Basic connection
96
engine = create_engine('phoenix://localhost:8765')
97
98
# With connection arguments
99
engine = create_engine(
100
'phoenix://localhost:8765',
101
connect_args={
102
'autocommit': True,
103
'authentication': 'BASIC',
104
'avatica_user': 'username',
105
'avatica_password': 'password'
106
}
107
)
108
109
# Test connection
110
with engine.connect() as conn:
111
result = conn.execute(text("SELECT 1"))
112
print(result.fetchone())
113
```
114
115
### URL Configuration
116
117
```python
118
from sqlalchemy import create_engine
119
120
# Basic URL
121
engine = create_engine('phoenix://localhost:8765')
122
123
# With HTTPS
124
engine = create_engine('phoenix://secure-host:8765',
125
connect_args={'verify': '/path/to/cert.pem'})
126
127
# URL parameters (alternative to connect_args)
128
url = 'phoenix://localhost:8765'
129
engine = create_engine(url, connect_args={
130
'authentication': 'SPNEGO',
131
'truststore': '/path/to/truststore.pem'
132
})
133
```
134
135
### Textual SQL Execution
136
137
The Phoenix dialect primarily supports textual SQL execution:
138
139
```python
140
from sqlalchemy import create_engine, text
141
142
engine = create_engine('phoenix://localhost:8765')
143
144
with engine.connect() as conn:
145
# Create table
146
conn.execute(text("""
147
CREATE TABLE users (
148
id INTEGER PRIMARY KEY,
149
username VARCHAR,
150
email VARCHAR
151
)
152
"""))
153
154
# Insert data
155
conn.execute(text("UPSERT INTO users VALUES (?, ?, ?)"),
156
(1, 'admin', 'admin@example.com'))
157
158
# Query data
159
result = conn.execute(text("SELECT * FROM users WHERE id = ?"), (1,))
160
user = result.fetchone()
161
print(f"User: {user}")
162
163
# Bulk operations
164
users_data = [
165
(2, 'john', 'john@example.com'),
166
(3, 'jane', 'jane@example.com')
167
]
168
conn.execute(text("UPSERT INTO users VALUES (?, ?, ?)"), users_data)
169
170
# Query all users
171
result = conn.execute(text("SELECT * FROM users ORDER BY id"))
172
for row in result:
173
print(f"ID: {row.id}, Username: {row.username}, Email: {row.email}")
174
```
175
176
### Connection Pool Configuration
177
178
```python
179
from sqlalchemy import create_engine
180
from sqlalchemy.pool import QueuePool
181
182
# Configure connection pooling
183
engine = create_engine(
184
'phoenix://localhost:8765',
185
poolclass=QueuePool,
186
pool_size=5,
187
max_overflow=10,
188
pool_timeout=30,
189
connect_args={
190
'autocommit': True,
191
'max_retries': 3
192
}
193
)
194
195
# Use the pooled connections
196
def execute_query(sql, params=None):
197
with engine.connect() as conn:
198
if params:
199
return conn.execute(text(sql), params)
200
else:
201
return conn.execute(text(sql))
202
203
# Multiple operations will reuse pooled connections
204
result1 = execute_query("SELECT COUNT(*) FROM users")
205
result2 = execute_query("SELECT * FROM users WHERE id = ?", (1,))
206
```
207
208
### Transaction Management
209
210
```python
211
from sqlalchemy import create_engine, text
212
213
engine = create_engine('phoenix://localhost:8765')
214
215
# Automatic transaction management
216
with engine.begin() as conn:
217
# All operations in single transaction
218
conn.execute(text("INSERT INTO audit_log VALUES (?, ?, ?)"),
219
(1, 'user_created', '2023-01-01'))
220
221
conn.execute(text("UPSERT INTO users VALUES (?, ?)"),
222
(1, 'new_user'))
223
224
# Automatic commit on successful completion
225
# Automatic rollback on exception
226
227
# Manual transaction control
228
with engine.connect() as conn:
229
trans = conn.begin()
230
try:
231
conn.execute(text("DELETE FROM temp_table"))
232
conn.execute(text("INSERT INTO temp_table SELECT * FROM source_table"))
233
trans.commit()
234
except Exception as e:
235
print(f"Error: {e}")
236
trans.rollback()
237
```
238
239
### Metadata Inspection
240
241
```python
242
from sqlalchemy import create_engine, inspect
243
244
engine = create_engine('phoenix://localhost:8765')
245
inspector = inspect(engine)
246
247
# Note: Limited metadata support in Phoenix dialect
248
try:
249
# Get table names
250
table_names = inspector.get_table_names()
251
print(f"Tables: {table_names}")
252
253
# Get schema names
254
schema_names = inspector.get_schema_names()
255
print(f"Schemas: {schema_names}")
256
257
except NotImplementedError:
258
print("Metadata introspection not fully supported")
259
260
# Use direct phoenixdb metadata instead
261
raw_conn = engine.raw_connection()
262
phoenixdb_conn = raw_conn.connection # Get underlying phoenixdb connection
263
meta = phoenixdb_conn.meta()
264
265
tables = meta.get_tables()
266
for table in tables:
267
print(f"Table: {table['TABLE_SCHEM']}.{table['TABLE_NAME']}")
268
```
269
270
### Authentication Configuration
271
272
```python
273
from sqlalchemy import create_engine
274
275
# Basic authentication
276
engine = create_engine(
277
'phoenix://localhost:8765',
278
connect_args={
279
'authentication': 'BASIC',
280
'avatica_user': 'username',
281
'avatica_password': 'password'
282
}
283
)
284
285
# SPNEGO/Kerberos authentication
286
engine = create_engine(
287
'phoenix://secure-host:8765',
288
connect_args={
289
'authentication': 'SPNEGO',
290
'verify': '/path/to/truststore.pem'
291
}
292
)
293
294
# Custom authentication using requests.auth
295
from requests.auth import HTTPBasicAuth
296
297
engine = create_engine(
298
'phoenix://localhost:8765',
299
connect_args={
300
'auth': HTTPBasicAuth('username', 'password')
301
}
302
)
303
```
304
305
### Error Handling
306
307
```python
308
from sqlalchemy import create_engine, text
309
from sqlalchemy.exc import SQLAlchemyError
310
import phoenixdb
311
312
engine = create_engine('phoenix://localhost:8765')
313
314
try:
315
with engine.connect() as conn:
316
result = conn.execute(text("SELECT * FROM nonexistent_table"))
317
318
except SQLAlchemyError as e:
319
print(f"SQLAlchemy error: {e}")
320
321
# Access underlying phoenixdb exception
322
if hasattr(e.orig, 'message'):
323
print(f"Phoenix error: {e.orig.message}")
324
if hasattr(e.orig, 'sqlstate'):
325
print(f"SQL state: {e.orig.sqlstate}")
326
327
except phoenixdb.Error as e:
328
print(f"phoenixdb error: {e.message}")
329
```
330
331
### Phoenix-Specific Features
332
333
```python
334
from sqlalchemy import create_engine, text
335
336
engine = create_engine('phoenix://localhost:8765')
337
338
with engine.connect() as conn:
339
# Phoenix UPSERT operations
340
conn.execute(text("""
341
UPSERT INTO users (id, username)
342
VALUES (1, 'updated_user')
343
"""))
344
345
# Array columns
346
conn.execute(text("""
347
CREATE TABLE test_arrays (
348
id INTEGER PRIMARY KEY,
349
numbers INTEGER ARRAY
350
)
351
"""))
352
353
conn.execute(text("UPSERT INTO test_arrays VALUES (?, ?)"),
354
(1, [1, 2, 3, 4, 5]))
355
356
# Phoenix functions and operators
357
result = conn.execute(text("""
358
SELECT id, ARRAY_LENGTH(numbers) as array_len
359
FROM test_arrays
360
WHERE ARRAY_CONTAINS(numbers, 3)
361
"""))
362
363
for row in result:
364
print(f"ID: {row.id}, Array Length: {row.array_len}")
365
366
# Phoenix-specific data types
367
conn.execute(text("""
368
CREATE TABLE phoenix_types (
369
id UNSIGNED_INT PRIMARY KEY,
370
big_num UNSIGNED_LONG,
371
precise_decimal DECIMAL(20,10)
372
)
373
"""))
374
```
375
376
## Limitations
377
378
The Phoenix SQLAlchemy dialect is incomplete and primarily supports:
379
380
- **Textual SQL execution** via `text()` construct
381
- **Basic connection management** and pooling
382
- **Transaction support** for Phoenix ACID operations
383
- **Parameter binding** for prepared statements
384
385
**Not supported:**
386
- SQLAlchemy ORM (Object-Relational Mapping)
387
- Core Table/Column abstraction
388
- Schema migration tools (Alembic)
389
- Advanced SQLAlchemy features (joins, subqueries via Core)
390
391
For full Phoenix functionality, use phoenixdb directly alongside SQLAlchemy for specific use cases.
392
393
```python
394
# Recommended approach for complex applications
395
from sqlalchemy import create_engine
396
import phoenixdb
397
398
# Use SQLAlchemy for basic operations
399
engine = create_engine('phoenix://localhost:8765')
400
401
# Use phoenixdb directly for advanced features
402
phoenixdb_conn = phoenixdb.connect('http://localhost:8765/')
403
meta = phoenixdb_conn.meta()
404
tables = meta.get_tables()
405
406
# Combine both as needed
407
with engine.connect() as sqlalchemy_conn:
408
# Basic SQL via SQLAlchemy
409
result = sqlalchemy_conn.execute(text("SELECT COUNT(*) FROM users"))
410
count = result.scalar()
411
412
# Advanced operations via phoenixdb
413
cursor = phoenixdb_conn.cursor(cursor_factory=phoenixdb.cursor.DictCursor)
414
cursor.execute("SELECT * FROM users")
415
users = cursor.fetchall()
416
```