Python database adapter library for Apache Phoenix databases implementing DB API 2.0 and partial SQLAlchemy support
Partial SQLAlchemy dialect implementation supporting textual SQL execution and basic database operations within SQLAlchemy applications, providing integration with the SQLAlchemy ecosystem while maintaining Phoenix-specific functionality.
SQLAlchemy dialect for Phoenix database connectivity through phoenixdb.
class PhoenixDialect(DefaultDialect):
"""
Phoenix dialect for SQLAlchemy.
Provides basic SQLAlchemy integration for Phoenix databases with support for
textual SQL execution and connection management.
"""
name = "phoenix"
"""Dialect name for SQLAlchemy registration."""
driver = "phoenixdb"
"""Driver name used by SQLAlchemy."""def create_engine(url, **kwargs):
"""
Creates SQLAlchemy engine for Phoenix connections.
URL format: phoenix://host:port[/path][?parameters]
Parameters:
- url (str): Connection URL in Phoenix format
- connect_args (dict): Additional phoenixdb.connect() parameters
- **kwargs: Standard SQLAlchemy engine parameters
Returns:
Engine: SQLAlchemy engine instance
"""class PhoenixExecutionContext(DefaultExecutionContext):
"""
Phoenix-specific execution context for SQLAlchemy operations.
"""
def should_autocommit_text(self, statement):
"""
Determines if statement should be autocommitted.
Parameters:
- statement (str): SQL statement text
Returns:
bool: True if statement requires autocommit (DDL/DML operations)
"""class PhoenixDDLCompiler(DDLCompiler):
"""
DDL compiler for Phoenix-specific SQL generation.
"""
def visit_primary_key_constraint(self, constraint):
"""
Compiles primary key constraints.
Parameters:
- constraint: SQLAlchemy PrimaryKeyConstraint
Returns:
str: Phoenix-compatible PRIMARY KEY clause
Raises:
CompileError: If constraint has no name (required by Phoenix)
"""from sqlalchemy import create_engine
# Basic connection
engine = create_engine('phoenix://localhost:8765')
# With connection arguments
engine = create_engine(
'phoenix://localhost:8765',
connect_args={
'autocommit': True,
'authentication': 'BASIC',
'avatica_user': 'username',
'avatica_password': 'password'
}
)
# Test connection
with engine.connect() as conn:
result = conn.execute(text("SELECT 1"))
print(result.fetchone())from sqlalchemy import create_engine
# Basic URL
engine = create_engine('phoenix://localhost:8765')
# With HTTPS
engine = create_engine('phoenix://secure-host:8765',
connect_args={'verify': '/path/to/cert.pem'})
# URL parameters (alternative to connect_args)
url = 'phoenix://localhost:8765'
engine = create_engine(url, connect_args={
'authentication': 'SPNEGO',
'truststore': '/path/to/truststore.pem'
})The Phoenix dialect primarily supports textual SQL execution:
from sqlalchemy import create_engine, text
engine = create_engine('phoenix://localhost:8765')
with engine.connect() as conn:
# Create table
conn.execute(text("""
CREATE TABLE users (
id INTEGER PRIMARY KEY,
username VARCHAR,
email VARCHAR
)
"""))
# Insert data
conn.execute(text("UPSERT INTO users VALUES (?, ?, ?)"),
(1, 'admin', 'admin@example.com'))
# Query data
result = conn.execute(text("SELECT * FROM users WHERE id = ?"), (1,))
user = result.fetchone()
print(f"User: {user}")
# Bulk operations
users_data = [
(2, 'john', 'john@example.com'),
(3, 'jane', 'jane@example.com')
]
conn.execute(text("UPSERT INTO users VALUES (?, ?, ?)"), users_data)
# Query all users
result = conn.execute(text("SELECT * FROM users ORDER BY id"))
for row in result:
print(f"ID: {row.id}, Username: {row.username}, Email: {row.email}")from sqlalchemy import create_engine
from sqlalchemy.pool import QueuePool
# Configure connection pooling
engine = create_engine(
'phoenix://localhost:8765',
poolclass=QueuePool,
pool_size=5,
max_overflow=10,
pool_timeout=30,
connect_args={
'autocommit': True,
'max_retries': 3
}
)
# Use the pooled connections
def execute_query(sql, params=None):
with engine.connect() as conn:
if params:
return conn.execute(text(sql), params)
else:
return conn.execute(text(sql))
# Multiple operations will reuse pooled connections
result1 = execute_query("SELECT COUNT(*) FROM users")
result2 = execute_query("SELECT * FROM users WHERE id = ?", (1,))from sqlalchemy import create_engine, text
engine = create_engine('phoenix://localhost:8765')
# Automatic transaction management
with engine.begin() as conn:
# All operations in single transaction
conn.execute(text("INSERT INTO audit_log VALUES (?, ?, ?)"),
(1, 'user_created', '2023-01-01'))
conn.execute(text("UPSERT INTO users VALUES (?, ?)"),
(1, 'new_user'))
# Automatic commit on successful completion
# Automatic rollback on exception
# Manual transaction control
with engine.connect() as conn:
trans = conn.begin()
try:
conn.execute(text("DELETE FROM temp_table"))
conn.execute(text("INSERT INTO temp_table SELECT * FROM source_table"))
trans.commit()
except Exception as e:
print(f"Error: {e}")
trans.rollback()from sqlalchemy import create_engine, inspect
engine = create_engine('phoenix://localhost:8765')
inspector = inspect(engine)
# Note: Limited metadata support in Phoenix dialect
try:
# Get table names
table_names = inspector.get_table_names()
print(f"Tables: {table_names}")
# Get schema names
schema_names = inspector.get_schema_names()
print(f"Schemas: {schema_names}")
except NotImplementedError:
print("Metadata introspection not fully supported")
# Use direct phoenixdb metadata instead
raw_conn = engine.raw_connection()
phoenixdb_conn = raw_conn.connection # Get underlying phoenixdb connection
meta = phoenixdb_conn.meta()
tables = meta.get_tables()
for table in tables:
print(f"Table: {table['TABLE_SCHEM']}.{table['TABLE_NAME']}")from sqlalchemy import create_engine
# Basic authentication
engine = create_engine(
'phoenix://localhost:8765',
connect_args={
'authentication': 'BASIC',
'avatica_user': 'username',
'avatica_password': 'password'
}
)
# SPNEGO/Kerberos authentication
engine = create_engine(
'phoenix://secure-host:8765',
connect_args={
'authentication': 'SPNEGO',
'verify': '/path/to/truststore.pem'
}
)
# Custom authentication using requests.auth
from requests.auth import HTTPBasicAuth
engine = create_engine(
'phoenix://localhost:8765',
connect_args={
'auth': HTTPBasicAuth('username', 'password')
}
)from sqlalchemy import create_engine, text
from sqlalchemy.exc import SQLAlchemyError
import phoenixdb
engine = create_engine('phoenix://localhost:8765')
try:
with engine.connect() as conn:
result = conn.execute(text("SELECT * FROM nonexistent_table"))
except SQLAlchemyError as e:
print(f"SQLAlchemy error: {e}")
# Access underlying phoenixdb exception
if hasattr(e.orig, 'message'):
print(f"Phoenix error: {e.orig.message}")
if hasattr(e.orig, 'sqlstate'):
print(f"SQL state: {e.orig.sqlstate}")
except phoenixdb.Error as e:
print(f"phoenixdb error: {e.message}")from sqlalchemy import create_engine, text
engine = create_engine('phoenix://localhost:8765')
with engine.connect() as conn:
# Phoenix UPSERT operations
conn.execute(text("""
UPSERT INTO users (id, username)
VALUES (1, 'updated_user')
"""))
# Array columns
conn.execute(text("""
CREATE TABLE test_arrays (
id INTEGER PRIMARY KEY,
numbers INTEGER ARRAY
)
"""))
conn.execute(text("UPSERT INTO test_arrays VALUES (?, ?)"),
(1, [1, 2, 3, 4, 5]))
# Phoenix functions and operators
result = conn.execute(text("""
SELECT id, ARRAY_LENGTH(numbers) as array_len
FROM test_arrays
WHERE ARRAY_CONTAINS(numbers, 3)
"""))
for row in result:
print(f"ID: {row.id}, Array Length: {row.array_len}")
# Phoenix-specific data types
conn.execute(text("""
CREATE TABLE phoenix_types (
id UNSIGNED_INT PRIMARY KEY,
big_num UNSIGNED_LONG,
precise_decimal DECIMAL(20,10)
)
"""))The Phoenix SQLAlchemy dialect is incomplete and primarily supports:
text() constructNot supported:
For full Phoenix functionality, use phoenixdb directly alongside SQLAlchemy for specific use cases.
# Recommended approach for complex applications
from sqlalchemy import create_engine
import phoenixdb
# Use SQLAlchemy for basic operations
engine = create_engine('phoenix://localhost:8765')
# Use phoenixdb directly for advanced features
phoenixdb_conn = phoenixdb.connect('http://localhost:8765/')
meta = phoenixdb_conn.meta()
tables = meta.get_tables()
# Combine both as needed
with engine.connect() as sqlalchemy_conn:
# Basic SQL via SQLAlchemy
result = sqlalchemy_conn.execute(text("SELECT COUNT(*) FROM users"))
count = result.scalar()
# Advanced operations via phoenixdb
cursor = phoenixdb_conn.cursor(cursor_factory=phoenixdb.cursor.DictCursor)
cursor.execute("SELECT * FROM users")
users = cursor.fetchall()Install with Tessl CLI
npx tessl i tessl/pypi-phoenixdb