CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-phoenixdb

Python database adapter library for Apache Phoenix databases implementing DB API 2.0 and partial SQLAlchemy support

Overview
Eval results
Files

meta.mddocs/

Database Metadata

Access to Phoenix database metadata including catalogs, schemas, tables, columns, primary keys, and indexes through JDBC-compatible interface for database introspection and schema discovery.

Capabilities

Meta Object Creation

Database metadata interface is accessed through the connection's meta() method.

class Connection:
    def meta(self):
        """
        Creates a metadata interface for database introspection.

        Returns:
        Meta: Database metadata object

        Raises:
        ProgrammingError: If connection is closed
        """

Meta Class

Provides methods for querying database schema information in JDBC-compatible format.

class Meta:
    """Database metadata interface for schema introspection."""

    def __init__(self, connection): ...

    def get_catalogs(self):
        """
        Returns available catalogs in the database.

        Returns:
        list: List of dictionaries with catalog information
              Each dict contains catalog metadata fields

        Raises:
        ProgrammingError: If connection is closed
        """

    def get_schemas(self, catalog=None, schemaPattern=None):
        """
        Returns available schemas matching the criteria.

        Parameters:
        - catalog (str, optional): Catalog name filter
        - schemaPattern (str, optional): Schema name pattern (SQL LIKE pattern)

        Returns:
        list: List of dictionaries with schema information
              Each dict contains: TABLE_CATALOG, TABLE_SCHEM, etc.

        Raises:
        ProgrammingError: If connection is closed
        """

    def get_tables(self, catalog=None, schemaPattern=None, tableNamePattern=None, typeList=None):
        """
        Returns tables matching the specified criteria.

        Parameters:
        - catalog (str, optional): Catalog name filter
        - schemaPattern (str, optional): Schema name pattern
        - tableNamePattern (str, optional): Table name pattern
        - typeList (list, optional): List of table types to include

        Returns:
        list: List of dictionaries with table information
              Each dict contains: TABLE_CATALOG, TABLE_SCHEM, TABLE_NAME, TABLE_TYPE, etc.

        Raises:
        ProgrammingError: If connection is closed
        """

    def get_columns(self, catalog=None, schemaPattern=None, tableNamePattern=None, columnNamePattern=None):
        """
        Returns columns matching the specified criteria.

        Parameters:
        - catalog (str, optional): Catalog name filter
        - schemaPattern (str, optional): Schema name pattern
        - tableNamePattern (str, optional): Table name pattern
        - columnNamePattern (str, optional): Column name pattern

        Returns:
        list: List of dictionaries with column information
              Each dict contains: TABLE_CATALOG, TABLE_SCHEM, TABLE_NAME, COLUMN_NAME,
              DATA_TYPE, TYPE_NAME, COLUMN_SIZE, DECIMAL_DIGITS, NUM_PREC_RADIX,
              NULLABLE, REMARKS, COLUMN_DEF, etc.

        Raises:
        ProgrammingError: If connection is closed
        """

    def get_table_types(self):
        """
        Returns available table types in the database.

        Returns:
        list: List of dictionaries with table type information
              Each dict contains: TABLE_TYPE

        Raises:
        ProgrammingError: If connection is closed
        """

    def get_type_info(self):
        """
        Returns information about supported data types.

        Returns:
        list: List of dictionaries with data type information
              Each dict contains: TYPE_NAME, DATA_TYPE, PRECISION, LITERAL_PREFIX,
              LITERAL_SUFFIX, CREATE_PARAMS, NULLABLE, CASE_SENSITIVE, SEARCHABLE, etc.

        Raises:
        ProgrammingError: If connection is closed
        """

    def get_primary_keys(self, catalog=None, schema=None, table=None):
        """
        Returns primary key information for specified table.

        Parameters:
        - catalog (str, optional): Catalog name
        - schema (str, optional): Schema name
        - table (str, optional): Table name

        Returns:
        list: List of dictionaries with primary key information
              Each dict contains: TABLE_CAT, TABLE_SCHEM, TABLE_NAME, COLUMN_NAME,
              KEY_SEQ, PK_NAME, plus Phoenix extensions: ASC_OR_DESC, DATA_TYPE,
              TYPE_NAME, COLUMN_SIZE, TYPE_ID, VIEW_CONSTANT

        Raises:
        ProgrammingError: If connection is closed
        """

    def get_index_info(self, catalog=None, schema=None, table=None, unique=False, approximate=False):
        """
        Returns index information for specified table.

        Parameters:
        - catalog (str, optional): Catalog name
        - schema (str, optional): Schema name
        - table (str, optional): Table name
        - unique (bool): Include only unique indexes if True
        - approximate (bool): Allow approximate information

        Returns:
        list: List of dictionaries with index information
              Each dict contains: TABLE_CAT, TABLE_SCHEM, TABLE_NAME, NON_UNIQUE,
              INDEX_QUALIFIER, INDEX_NAME, TYPE, ORDINAL_POSITION, COLUMN_NAME,
              ASC_OR_DESC, CARDINALITY, PAGES, FILTER_CONDITION, plus Phoenix extensions:
              DATA_TYPE, TYPE_NAME, TYPE_ID, COLUMN_FAMILY, COLUMN_SIZE, ARRAY_SIZE

        Raises:
        ProgrammingError: If connection is closed
        """

Usage Examples

Basic Metadata Access

import phoenixdb

conn = phoenixdb.connect('http://localhost:8765/')
meta = conn.meta()

# Get all schemas
schemas = meta.get_schemas()
for schema in schemas:
    print(f"Schema: {schema['TABLE_SCHEM']}")

# Get all tables
tables = meta.get_tables()
for table in tables:
    print(f"Table: {table['TABLE_SCHEM']}.{table['TABLE_NAME']} ({table['TABLE_TYPE']})")

# Get columns for a specific table
columns = meta.get_columns(schemaPattern='MY_SCHEMA', tableNamePattern='USERS')
for column in columns:
    print(f"Column: {column['COLUMN_NAME']} ({column['TYPE_NAME']})")

Schema Discovery

def discover_schema(connection):
    """Discover and print complete database schema."""
    meta = connection.meta()

    print("=== Database Schema Discovery ===")

    # Get all schemas
    schemas = meta.get_schemas()
    print(f"\nFound {len(schemas)} schemas:")

    for schema in schemas:
        schema_name = schema['TABLE_SCHEM'] or '<DEFAULT>'
        print(f"\nSchema: {schema_name}")

        # Get tables in this schema
        tables = meta.get_tables(schemaPattern=schema['TABLE_SCHEM'])
        print(f"  Tables ({len(tables)}):")

        for table in tables:
            table_name = table['TABLE_NAME']
            table_type = table['TABLE_TYPE']
            print(f"    {table_name} ({table_type})")

            # Get columns for this table
            columns = meta.get_columns(
                schemaPattern=schema['TABLE_SCHEM'],
                tableNamePattern=table_name
            )

            print(f"      Columns ({len(columns)}):")
            for column in columns:
                col_name = column['COLUMN_NAME']
                col_type = column['TYPE_NAME']
                col_size = column['COLUMN_SIZE']
                nullable = "NULL" if column['NULLABLE'] else "NOT NULL"
                print(f"        {col_name} {col_type}({col_size}) {nullable}")

# Usage
discover_schema(conn)

Table Analysis

def analyze_table(connection, schema_name, table_name):
    """Analyze specific table structure and constraints."""
    meta = connection.meta()

    print(f"=== Analyzing Table: {schema_name}.{table_name} ===")

    # Get table information
    tables = meta.get_tables(
        schemaPattern=schema_name,
        tableNamePattern=table_name
    )

    if not tables:
        print("Table not found")
        return

    table_info = tables[0]
    print(f"Table Type: {table_info['TABLE_TYPE']}")
    if table_info.get('REMARKS'):
        print(f"Remarks: {table_info['REMARKS']}")

    # Get column details
    columns = meta.get_columns(
        schemaPattern=schema_name,
        tableNamePattern=table_name
    )

    print(f"\nColumns ({len(columns)}):")
    for column in columns:
        col_name = column['COLUMN_NAME']
        col_type = column['TYPE_NAME']
        col_size = column.get('COLUMN_SIZE', 'N/A')
        decimal_digits = column.get('DECIMAL_DIGITS')
        nullable = column['NULLABLE']
        default_val = column.get('COLUMN_DEF')

        nullable_str = "NULL" if nullable == 1 else "NOT NULL" if nullable == 0 else "UNKNOWN"

        print(f"  {col_name}:")
        print(f"    Type: {col_type}")
        print(f"    Size: {col_size}")
        if decimal_digits is not None:
            print(f"    Decimal Digits: {decimal_digits}")
        print(f"    Nullable: {nullable_str}")
        if default_val:
            print(f"    Default: {default_val}")

    # Get primary key information
    primary_keys = meta.get_primary_keys(
        schema=schema_name,
        table=table_name
    )

    if primary_keys:
        print(f"\nPrimary Key:")
        pk_columns = sorted(primary_keys, key=lambda x: x['KEY_SEQ'])
        for pk in pk_columns:
            print(f"  {pk['COLUMN_NAME']} (sequence: {pk['KEY_SEQ']})")
        if primary_keys[0].get('PK_NAME'):
            print(f"  Constraint Name: {primary_keys[0]['PK_NAME']}")

    # Get index information
    indexes = meta.get_index_info(
        schema=schema_name,
        table=table_name
    )

    if indexes:
        print(f"\nIndexes:")
        index_groups = {}
        for idx in indexes:
            idx_name = idx['INDEX_NAME']
            if idx_name not in index_groups:
                index_groups[idx_name] = []
            index_groups[idx_name].append(idx)

        for idx_name, idx_columns in index_groups.items():
            if idx_name:  # Skip table statistics
                unique = "UNIQUE" if not idx_columns[0]['NON_UNIQUE'] else ""
                print(f"  {idx_name} {unique}")
                sorted_columns = sorted(idx_columns, key=lambda x: x['ORDINAL_POSITION'])
                for idx_col in sorted_columns:
                    col_name = idx_col['COLUMN_NAME']
                    asc_desc = idx_col.get('ASC_OR_DESC', 'A')
                    order = "ASC" if asc_desc == 'A' else "DESC"
                    print(f"    {col_name} {order}")

# Usage
analyze_table(conn, 'MY_SCHEMA', 'USERS')

Data Type Discovery

def show_supported_types(connection):
    """Display all supported data types."""
    meta = connection.meta()

    type_info = meta.get_type_info()

    print("=== Supported Data Types ===")
    for type_data in type_info:
        type_name = type_data['TYPE_NAME']
        data_type = type_data['DATA_TYPE']
        precision = type_data.get('PRECISION', 'N/A')
        nullable = type_data['NULLABLE']

        nullable_str = "NULLABLE" if nullable == 1 else "NO NULLS" if nullable == 0 else "UNKNOWN"

        print(f"{type_name}:")
        print(f"  JDBC Type Code: {data_type}")
        print(f"  Precision: {precision}")
        print(f"  Nullable: {nullable_str}")

        if type_data.get('LITERAL_PREFIX'):
            print(f"  Literal Prefix: {type_data['LITERAL_PREFIX']}")
        if type_data.get('LITERAL_SUFFIX'):
            print(f"  Literal Suffix: {type_data['LITERAL_SUFFIX']}")

        print()

# Usage
show_supported_types(conn)

Schema Comparison

def compare_schemas(connection, schema1, schema2):
    """Compare two schemas for differences."""
    meta = connection.meta()

    print(f"=== Comparing Schemas: {schema1} vs {schema2} ===")

    # Get tables from both schemas
    tables1 = set()
    for table in meta.get_tables(schemaPattern=schema1):
        tables1.add(table['TABLE_NAME'])

    tables2 = set()
    for table in meta.get_tables(schemaPattern=schema2):
        tables2.add(table['TABLE_NAME'])

    # Find differences
    only_in_schema1 = tables1 - tables2
    only_in_schema2 = tables2 - tables1
    common_tables = tables1 & tables2

    if only_in_schema1:
        print(f"\nTables only in {schema1}:")
        for table in sorted(only_in_schema1):
            print(f"  {table}")

    if only_in_schema2:
        print(f"\nTables only in {schema2}:")
        for table in sorted(only_in_schema2):
            print(f"  {table}")

    print(f"\nCommon tables ({len(common_tables)}):")
    for table in sorted(common_tables):
        print(f"  {table}")

        # Compare column structures
        cols1 = meta.get_columns(schemaPattern=schema1, tableNamePattern=table)
        cols2 = meta.get_columns(schemaPattern=schema2, tableNamePattern=table)

        cols1_names = {col['COLUMN_NAME']: col for col in cols1}
        cols2_names = {col['COLUMN_NAME']: col for col in cols2}

        col_diff = set(cols1_names.keys()) - set(cols2_names.keys())
        if col_diff:
            print(f"    Columns only in {schema1}: {', '.join(sorted(col_diff))}")

        col_diff = set(cols2_names.keys()) - set(cols1_names.keys())
        if col_diff:
            print(f"    Columns only in {schema2}: {', '.join(sorted(col_diff))}")

# Usage
compare_schemas(conn, 'PROD_SCHEMA', 'TEST_SCHEMA')

Phoenix-Specific Metadata

Phoenix extends standard JDBC metadata with additional fields:

def show_phoenix_extensions(connection, schema_name, table_name):
    """Show Phoenix-specific metadata extensions."""
    meta = connection.meta()

    # Primary key extensions
    primary_keys = meta.get_primary_keys(schema=schema_name, table=table_name)
    if primary_keys:
        print("Phoenix Primary Key Extensions:")
        for pk in primary_keys:
            print(f"  Column: {pk['COLUMN_NAME']}")
            print(f"  ASC_OR_DESC: {pk.get('ASC_OR_DESC')}")
            print(f"  DATA_TYPE: {pk.get('DATA_TYPE')}")
            print(f"  TYPE_ID: {pk.get('TYPE_ID')}")
            print(f"  VIEW_CONSTANT: {pk.get('VIEW_CONSTANT')}")

    # Index extensions
    indexes = meta.get_index_info(schema=schema_name, table=table_name)
    if indexes:
        print("\nPhoenix Index Extensions:")
        for idx in indexes:
            if idx['INDEX_NAME']:
                print(f"  Index: {idx['INDEX_NAME']}")
                print(f"  Column Family: {idx.get('COLUMN_FAMILY')}")
                print(f"  Array Size: {idx.get('ARRAY_SIZE')}")

# Usage
show_phoenix_extensions(conn, 'MY_SCHEMA', 'MY_TABLE')

Install with Tessl CLI

npx tessl i tessl/pypi-phoenixdb

docs

connection.md

cursor.md

errors.md

index.md

meta.md

sqlalchemy.md

types.md

tile.json