CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-phoenixdb

Python database adapter library for Apache Phoenix databases implementing DB API 2.0 and partial SQLAlchemy support

Overview
Eval results
Files

sqlalchemy.mddocs/

SQLAlchemy Integration

Partial SQLAlchemy dialect implementation supporting textual SQL execution and basic database operations within SQLAlchemy applications, providing integration with the SQLAlchemy ecosystem while maintaining Phoenix-specific functionality.

Capabilities

Phoenix Dialect

SQLAlchemy dialect for Phoenix database connectivity through phoenixdb.

class PhoenixDialect(DefaultDialect):
    """
    Phoenix dialect for SQLAlchemy.

    Provides basic SQLAlchemy integration for Phoenix databases with support for
    textual SQL execution and connection management.
    """

    name = "phoenix"
    """Dialect name for SQLAlchemy registration."""

    driver = "phoenixdb"
    """Driver name used by SQLAlchemy."""

Engine Creation

def create_engine(url, **kwargs):
    """
    Creates SQLAlchemy engine for Phoenix connections.

    URL format: phoenix://host:port[/path][?parameters]

    Parameters:
    - url (str): Connection URL in Phoenix format
    - connect_args (dict): Additional phoenixdb.connect() parameters
    - **kwargs: Standard SQLAlchemy engine parameters

    Returns:
    Engine: SQLAlchemy engine instance
    """

Execution Context

class PhoenixExecutionContext(DefaultExecutionContext):
    """
    Phoenix-specific execution context for SQLAlchemy operations.
    """

    def should_autocommit_text(self, statement):
        """
        Determines if statement should be autocommitted.

        Parameters:
        - statement (str): SQL statement text

        Returns:
        bool: True if statement requires autocommit (DDL/DML operations)
        """

DDL Compiler

class PhoenixDDLCompiler(DDLCompiler):
    """
    DDL compiler for Phoenix-specific SQL generation.
    """

    def visit_primary_key_constraint(self, constraint):
        """
        Compiles primary key constraints.

        Parameters:
        - constraint: SQLAlchemy PrimaryKeyConstraint

        Returns:
        str: Phoenix-compatible PRIMARY KEY clause

        Raises:
        CompileError: If constraint has no name (required by Phoenix)
        """

Usage Examples

Basic Engine Setup

from sqlalchemy import create_engine

# Basic connection
engine = create_engine('phoenix://localhost:8765')

# With connection arguments
engine = create_engine(
    'phoenix://localhost:8765',
    connect_args={
        'autocommit': True,
        'authentication': 'BASIC',
        'avatica_user': 'username',
        'avatica_password': 'password'
    }
)

# Test connection
with engine.connect() as conn:
    result = conn.execute(text("SELECT 1"))
    print(result.fetchone())

URL Configuration

from sqlalchemy import create_engine

# Basic URL
engine = create_engine('phoenix://localhost:8765')

# With HTTPS
engine = create_engine('phoenix://secure-host:8765',
                      connect_args={'verify': '/path/to/cert.pem'})

# URL parameters (alternative to connect_args)
url = 'phoenix://localhost:8765'
engine = create_engine(url, connect_args={
    'authentication': 'SPNEGO',
    'truststore': '/path/to/truststore.pem'
})

Textual SQL Execution

The Phoenix dialect primarily supports textual SQL execution:

from sqlalchemy import create_engine, text

engine = create_engine('phoenix://localhost:8765')

with engine.connect() as conn:
    # Create table
    conn.execute(text("""
        CREATE TABLE users (
            id INTEGER PRIMARY KEY,
            username VARCHAR,
            email VARCHAR
        )
    """))

    # Insert data
    conn.execute(text("UPSERT INTO users VALUES (?, ?, ?)"),
                (1, 'admin', 'admin@example.com'))

    # Query data
    result = conn.execute(text("SELECT * FROM users WHERE id = ?"), (1,))
    user = result.fetchone()
    print(f"User: {user}")

    # Bulk operations
    users_data = [
        (2, 'john', 'john@example.com'),
        (3, 'jane', 'jane@example.com')
    ]
    conn.execute(text("UPSERT INTO users VALUES (?, ?, ?)"), users_data)

    # Query all users
    result = conn.execute(text("SELECT * FROM users ORDER BY id"))
    for row in result:
        print(f"ID: {row.id}, Username: {row.username}, Email: {row.email}")

Connection Pool Configuration

from sqlalchemy import create_engine
from sqlalchemy.pool import QueuePool

# Configure connection pooling
engine = create_engine(
    'phoenix://localhost:8765',
    poolclass=QueuePool,
    pool_size=5,
    max_overflow=10,
    pool_timeout=30,
    connect_args={
        'autocommit': True,
        'max_retries': 3
    }
)

# Use the pooled connections
def execute_query(sql, params=None):
    with engine.connect() as conn:
        if params:
            return conn.execute(text(sql), params)
        else:
            return conn.execute(text(sql))

# Multiple operations will reuse pooled connections
result1 = execute_query("SELECT COUNT(*) FROM users")
result2 = execute_query("SELECT * FROM users WHERE id = ?", (1,))

Transaction Management

from sqlalchemy import create_engine, text

engine = create_engine('phoenix://localhost:8765')

# Automatic transaction management
with engine.begin() as conn:
    # All operations in single transaction
    conn.execute(text("INSERT INTO audit_log VALUES (?, ?, ?)"),
                (1, 'user_created', '2023-01-01'))

    conn.execute(text("UPSERT INTO users VALUES (?, ?)"),
                (1, 'new_user'))

    # Automatic commit on successful completion
    # Automatic rollback on exception

# Manual transaction control
with engine.connect() as conn:
    trans = conn.begin()
    try:
        conn.execute(text("DELETE FROM temp_table"))
        conn.execute(text("INSERT INTO temp_table SELECT * FROM source_table"))
        trans.commit()
    except Exception as e:
        print(f"Error: {e}")
        trans.rollback()

Metadata Inspection

from sqlalchemy import create_engine, inspect

engine = create_engine('phoenix://localhost:8765')
inspector = inspect(engine)

# Note: Limited metadata support in Phoenix dialect
try:
    # Get table names
    table_names = inspector.get_table_names()
    print(f"Tables: {table_names}")

    # Get schema names
    schema_names = inspector.get_schema_names()
    print(f"Schemas: {schema_names}")

except NotImplementedError:
    print("Metadata introspection not fully supported")

    # Use direct phoenixdb metadata instead
    raw_conn = engine.raw_connection()
    phoenixdb_conn = raw_conn.connection  # Get underlying phoenixdb connection
    meta = phoenixdb_conn.meta()

    tables = meta.get_tables()
    for table in tables:
        print(f"Table: {table['TABLE_SCHEM']}.{table['TABLE_NAME']}")

Authentication Configuration

from sqlalchemy import create_engine

# Basic authentication
engine = create_engine(
    'phoenix://localhost:8765',
    connect_args={
        'authentication': 'BASIC',
        'avatica_user': 'username',
        'avatica_password': 'password'
    }
)

# SPNEGO/Kerberos authentication
engine = create_engine(
    'phoenix://secure-host:8765',
    connect_args={
        'authentication': 'SPNEGO',
        'verify': '/path/to/truststore.pem'
    }
)

# Custom authentication using requests.auth
from requests.auth import HTTPBasicAuth

engine = create_engine(
    'phoenix://localhost:8765',
    connect_args={
        'auth': HTTPBasicAuth('username', 'password')
    }
)

Error Handling

from sqlalchemy import create_engine, text
from sqlalchemy.exc import SQLAlchemyError
import phoenixdb

engine = create_engine('phoenix://localhost:8765')

try:
    with engine.connect() as conn:
        result = conn.execute(text("SELECT * FROM nonexistent_table"))

except SQLAlchemyError as e:
    print(f"SQLAlchemy error: {e}")

    # Access underlying phoenixdb exception
    if hasattr(e.orig, 'message'):
        print(f"Phoenix error: {e.orig.message}")
        if hasattr(e.orig, 'sqlstate'):
            print(f"SQL state: {e.orig.sqlstate}")

except phoenixdb.Error as e:
    print(f"phoenixdb error: {e.message}")

Phoenix-Specific Features

from sqlalchemy import create_engine, text

engine = create_engine('phoenix://localhost:8765')

with engine.connect() as conn:
    # Phoenix UPSERT operations
    conn.execute(text("""
        UPSERT INTO users (id, username)
        VALUES (1, 'updated_user')
    """))

    # Array columns
    conn.execute(text("""
        CREATE TABLE test_arrays (
            id INTEGER PRIMARY KEY,
            numbers INTEGER ARRAY
        )
    """))

    conn.execute(text("UPSERT INTO test_arrays VALUES (?, ?)"),
                (1, [1, 2, 3, 4, 5]))

    # Phoenix functions and operators
    result = conn.execute(text("""
        SELECT id, ARRAY_LENGTH(numbers) as array_len
        FROM test_arrays
        WHERE ARRAY_CONTAINS(numbers, 3)
    """))

    for row in result:
        print(f"ID: {row.id}, Array Length: {row.array_len}")

    # Phoenix-specific data types
    conn.execute(text("""
        CREATE TABLE phoenix_types (
            id UNSIGNED_INT PRIMARY KEY,
            big_num UNSIGNED_LONG,
            precise_decimal DECIMAL(20,10)
        )
    """))

Limitations

The Phoenix SQLAlchemy dialect is incomplete and primarily supports:

  • Textual SQL execution via text() construct
  • Basic connection management and pooling
  • Transaction support for Phoenix ACID operations
  • Parameter binding for prepared statements

Not supported:

  • SQLAlchemy ORM (Object-Relational Mapping)
  • Core Table/Column abstraction
  • Schema migration tools (Alembic)
  • Advanced SQLAlchemy features (joins, subqueries via Core)

For full Phoenix functionality, use phoenixdb directly alongside SQLAlchemy for specific use cases.

# Recommended approach for complex applications
from sqlalchemy import create_engine
import phoenixdb

# Use SQLAlchemy for basic operations
engine = create_engine('phoenix://localhost:8765')

# Use phoenixdb directly for advanced features
phoenixdb_conn = phoenixdb.connect('http://localhost:8765/')
meta = phoenixdb_conn.meta()
tables = meta.get_tables()

# Combine both as needed
with engine.connect() as sqlalchemy_conn:
    # Basic SQL via SQLAlchemy
    result = sqlalchemy_conn.execute(text("SELECT COUNT(*) FROM users"))
    count = result.scalar()

# Advanced operations via phoenixdb
cursor = phoenixdb_conn.cursor(cursor_factory=phoenixdb.cursor.DictCursor)
cursor.execute("SELECT * FROM users")
users = cursor.fetchall()

Install with Tessl CLI

npx tessl i tessl/pypi-phoenixdb

docs

connection.md

cursor.md

errors.md

index.md

meta.md

sqlalchemy.md

types.md

tile.json