Python database adapter library for Apache Phoenix databases implementing DB API 2.0 and partial SQLAlchemy support
Access to Phoenix database metadata including catalogs, schemas, tables, columns, primary keys, and indexes through JDBC-compatible interface for database introspection and schema discovery.
Database metadata interface is accessed through the connection's meta() method.
class Connection:
def meta(self):
"""
Creates a metadata interface for database introspection.
Returns:
Meta: Database metadata object
Raises:
ProgrammingError: If connection is closed
"""Provides methods for querying database schema information in JDBC-compatible format.
class Meta:
"""Database metadata interface for schema introspection."""
def __init__(self, connection): ...
def get_catalogs(self):
"""
Returns available catalogs in the database.
Returns:
list: List of dictionaries with catalog information
Each dict contains catalog metadata fields
Raises:
ProgrammingError: If connection is closed
"""
def get_schemas(self, catalog=None, schemaPattern=None):
"""
Returns available schemas matching the criteria.
Parameters:
- catalog (str, optional): Catalog name filter
- schemaPattern (str, optional): Schema name pattern (SQL LIKE pattern)
Returns:
list: List of dictionaries with schema information
Each dict contains: TABLE_CATALOG, TABLE_SCHEM, etc.
Raises:
ProgrammingError: If connection is closed
"""
def get_tables(self, catalog=None, schemaPattern=None, tableNamePattern=None, typeList=None):
"""
Returns tables matching the specified criteria.
Parameters:
- catalog (str, optional): Catalog name filter
- schemaPattern (str, optional): Schema name pattern
- tableNamePattern (str, optional): Table name pattern
- typeList (list, optional): List of table types to include
Returns:
list: List of dictionaries with table information
Each dict contains: TABLE_CATALOG, TABLE_SCHEM, TABLE_NAME, TABLE_TYPE, etc.
Raises:
ProgrammingError: If connection is closed
"""
def get_columns(self, catalog=None, schemaPattern=None, tableNamePattern=None, columnNamePattern=None):
"""
Returns columns matching the specified criteria.
Parameters:
- catalog (str, optional): Catalog name filter
- schemaPattern (str, optional): Schema name pattern
- tableNamePattern (str, optional): Table name pattern
- columnNamePattern (str, optional): Column name pattern
Returns:
list: List of dictionaries with column information
Each dict contains: TABLE_CATALOG, TABLE_SCHEM, TABLE_NAME, COLUMN_NAME,
DATA_TYPE, TYPE_NAME, COLUMN_SIZE, DECIMAL_DIGITS, NUM_PREC_RADIX,
NULLABLE, REMARKS, COLUMN_DEF, etc.
Raises:
ProgrammingError: If connection is closed
"""
def get_table_types(self):
"""
Returns available table types in the database.
Returns:
list: List of dictionaries with table type information
Each dict contains: TABLE_TYPE
Raises:
ProgrammingError: If connection is closed
"""
def get_type_info(self):
"""
Returns information about supported data types.
Returns:
list: List of dictionaries with data type information
Each dict contains: TYPE_NAME, DATA_TYPE, PRECISION, LITERAL_PREFIX,
LITERAL_SUFFIX, CREATE_PARAMS, NULLABLE, CASE_SENSITIVE, SEARCHABLE, etc.
Raises:
ProgrammingError: If connection is closed
"""
def get_primary_keys(self, catalog=None, schema=None, table=None):
"""
Returns primary key information for specified table.
Parameters:
- catalog (str, optional): Catalog name
- schema (str, optional): Schema name
- table (str, optional): Table name
Returns:
list: List of dictionaries with primary key information
Each dict contains: TABLE_CAT, TABLE_SCHEM, TABLE_NAME, COLUMN_NAME,
KEY_SEQ, PK_NAME, plus Phoenix extensions: ASC_OR_DESC, DATA_TYPE,
TYPE_NAME, COLUMN_SIZE, TYPE_ID, VIEW_CONSTANT
Raises:
ProgrammingError: If connection is closed
"""
def get_index_info(self, catalog=None, schema=None, table=None, unique=False, approximate=False):
"""
Returns index information for specified table.
Parameters:
- catalog (str, optional): Catalog name
- schema (str, optional): Schema name
- table (str, optional): Table name
- unique (bool): Include only unique indexes if True
- approximate (bool): Allow approximate information
Returns:
list: List of dictionaries with index information
Each dict contains: TABLE_CAT, TABLE_SCHEM, TABLE_NAME, NON_UNIQUE,
INDEX_QUALIFIER, INDEX_NAME, TYPE, ORDINAL_POSITION, COLUMN_NAME,
ASC_OR_DESC, CARDINALITY, PAGES, FILTER_CONDITION, plus Phoenix extensions:
DATA_TYPE, TYPE_NAME, TYPE_ID, COLUMN_FAMILY, COLUMN_SIZE, ARRAY_SIZE
Raises:
ProgrammingError: If connection is closed
"""import phoenixdb
conn = phoenixdb.connect('http://localhost:8765/')
meta = conn.meta()
# Get all schemas
schemas = meta.get_schemas()
for schema in schemas:
print(f"Schema: {schema['TABLE_SCHEM']}")
# Get all tables
tables = meta.get_tables()
for table in tables:
print(f"Table: {table['TABLE_SCHEM']}.{table['TABLE_NAME']} ({table['TABLE_TYPE']})")
# Get columns for a specific table
columns = meta.get_columns(schemaPattern='MY_SCHEMA', tableNamePattern='USERS')
for column in columns:
print(f"Column: {column['COLUMN_NAME']} ({column['TYPE_NAME']})")def discover_schema(connection):
"""Discover and print complete database schema."""
meta = connection.meta()
print("=== Database Schema Discovery ===")
# Get all schemas
schemas = meta.get_schemas()
print(f"\nFound {len(schemas)} schemas:")
for schema in schemas:
schema_name = schema['TABLE_SCHEM'] or '<DEFAULT>'
print(f"\nSchema: {schema_name}")
# Get tables in this schema
tables = meta.get_tables(schemaPattern=schema['TABLE_SCHEM'])
print(f" Tables ({len(tables)}):")
for table in tables:
table_name = table['TABLE_NAME']
table_type = table['TABLE_TYPE']
print(f" {table_name} ({table_type})")
# Get columns for this table
columns = meta.get_columns(
schemaPattern=schema['TABLE_SCHEM'],
tableNamePattern=table_name
)
print(f" Columns ({len(columns)}):")
for column in columns:
col_name = column['COLUMN_NAME']
col_type = column['TYPE_NAME']
col_size = column['COLUMN_SIZE']
nullable = "NULL" if column['NULLABLE'] else "NOT NULL"
print(f" {col_name} {col_type}({col_size}) {nullable}")
# Usage
discover_schema(conn)def analyze_table(connection, schema_name, table_name):
"""Analyze specific table structure and constraints."""
meta = connection.meta()
print(f"=== Analyzing Table: {schema_name}.{table_name} ===")
# Get table information
tables = meta.get_tables(
schemaPattern=schema_name,
tableNamePattern=table_name
)
if not tables:
print("Table not found")
return
table_info = tables[0]
print(f"Table Type: {table_info['TABLE_TYPE']}")
if table_info.get('REMARKS'):
print(f"Remarks: {table_info['REMARKS']}")
# Get column details
columns = meta.get_columns(
schemaPattern=schema_name,
tableNamePattern=table_name
)
print(f"\nColumns ({len(columns)}):")
for column in columns:
col_name = column['COLUMN_NAME']
col_type = column['TYPE_NAME']
col_size = column.get('COLUMN_SIZE', 'N/A')
decimal_digits = column.get('DECIMAL_DIGITS')
nullable = column['NULLABLE']
default_val = column.get('COLUMN_DEF')
nullable_str = "NULL" if nullable == 1 else "NOT NULL" if nullable == 0 else "UNKNOWN"
print(f" {col_name}:")
print(f" Type: {col_type}")
print(f" Size: {col_size}")
if decimal_digits is not None:
print(f" Decimal Digits: {decimal_digits}")
print(f" Nullable: {nullable_str}")
if default_val:
print(f" Default: {default_val}")
# Get primary key information
primary_keys = meta.get_primary_keys(
schema=schema_name,
table=table_name
)
if primary_keys:
print(f"\nPrimary Key:")
pk_columns = sorted(primary_keys, key=lambda x: x['KEY_SEQ'])
for pk in pk_columns:
print(f" {pk['COLUMN_NAME']} (sequence: {pk['KEY_SEQ']})")
if primary_keys[0].get('PK_NAME'):
print(f" Constraint Name: {primary_keys[0]['PK_NAME']}")
# Get index information
indexes = meta.get_index_info(
schema=schema_name,
table=table_name
)
if indexes:
print(f"\nIndexes:")
index_groups = {}
for idx in indexes:
idx_name = idx['INDEX_NAME']
if idx_name not in index_groups:
index_groups[idx_name] = []
index_groups[idx_name].append(idx)
for idx_name, idx_columns in index_groups.items():
if idx_name: # Skip table statistics
unique = "UNIQUE" if not idx_columns[0]['NON_UNIQUE'] else ""
print(f" {idx_name} {unique}")
sorted_columns = sorted(idx_columns, key=lambda x: x['ORDINAL_POSITION'])
for idx_col in sorted_columns:
col_name = idx_col['COLUMN_NAME']
asc_desc = idx_col.get('ASC_OR_DESC', 'A')
order = "ASC" if asc_desc == 'A' else "DESC"
print(f" {col_name} {order}")
# Usage
analyze_table(conn, 'MY_SCHEMA', 'USERS')def show_supported_types(connection):
"""Display all supported data types."""
meta = connection.meta()
type_info = meta.get_type_info()
print("=== Supported Data Types ===")
for type_data in type_info:
type_name = type_data['TYPE_NAME']
data_type = type_data['DATA_TYPE']
precision = type_data.get('PRECISION', 'N/A')
nullable = type_data['NULLABLE']
nullable_str = "NULLABLE" if nullable == 1 else "NO NULLS" if nullable == 0 else "UNKNOWN"
print(f"{type_name}:")
print(f" JDBC Type Code: {data_type}")
print(f" Precision: {precision}")
print(f" Nullable: {nullable_str}")
if type_data.get('LITERAL_PREFIX'):
print(f" Literal Prefix: {type_data['LITERAL_PREFIX']}")
if type_data.get('LITERAL_SUFFIX'):
print(f" Literal Suffix: {type_data['LITERAL_SUFFIX']}")
print()
# Usage
show_supported_types(conn)def compare_schemas(connection, schema1, schema2):
"""Compare two schemas for differences."""
meta = connection.meta()
print(f"=== Comparing Schemas: {schema1} vs {schema2} ===")
# Get tables from both schemas
tables1 = set()
for table in meta.get_tables(schemaPattern=schema1):
tables1.add(table['TABLE_NAME'])
tables2 = set()
for table in meta.get_tables(schemaPattern=schema2):
tables2.add(table['TABLE_NAME'])
# Find differences
only_in_schema1 = tables1 - tables2
only_in_schema2 = tables2 - tables1
common_tables = tables1 & tables2
if only_in_schema1:
print(f"\nTables only in {schema1}:")
for table in sorted(only_in_schema1):
print(f" {table}")
if only_in_schema2:
print(f"\nTables only in {schema2}:")
for table in sorted(only_in_schema2):
print(f" {table}")
print(f"\nCommon tables ({len(common_tables)}):")
for table in sorted(common_tables):
print(f" {table}")
# Compare column structures
cols1 = meta.get_columns(schemaPattern=schema1, tableNamePattern=table)
cols2 = meta.get_columns(schemaPattern=schema2, tableNamePattern=table)
cols1_names = {col['COLUMN_NAME']: col for col in cols1}
cols2_names = {col['COLUMN_NAME']: col for col in cols2}
col_diff = set(cols1_names.keys()) - set(cols2_names.keys())
if col_diff:
print(f" Columns only in {schema1}: {', '.join(sorted(col_diff))}")
col_diff = set(cols2_names.keys()) - set(cols1_names.keys())
if col_diff:
print(f" Columns only in {schema2}: {', '.join(sorted(col_diff))}")
# Usage
compare_schemas(conn, 'PROD_SCHEMA', 'TEST_SCHEMA')Phoenix extends standard JDBC metadata with additional fields:
def show_phoenix_extensions(connection, schema_name, table_name):
"""Show Phoenix-specific metadata extensions."""
meta = connection.meta()
# Primary key extensions
primary_keys = meta.get_primary_keys(schema=schema_name, table=table_name)
if primary_keys:
print("Phoenix Primary Key Extensions:")
for pk in primary_keys:
print(f" Column: {pk['COLUMN_NAME']}")
print(f" ASC_OR_DESC: {pk.get('ASC_OR_DESC')}")
print(f" DATA_TYPE: {pk.get('DATA_TYPE')}")
print(f" TYPE_ID: {pk.get('TYPE_ID')}")
print(f" VIEW_CONSTANT: {pk.get('VIEW_CONSTANT')}")
# Index extensions
indexes = meta.get_index_info(schema=schema_name, table=table_name)
if indexes:
print("\nPhoenix Index Extensions:")
for idx in indexes:
if idx['INDEX_NAME']:
print(f" Index: {idx['INDEX_NAME']}")
print(f" Column Family: {idx.get('COLUMN_FAMILY')}")
print(f" Array Size: {idx.get('ARRAY_SIZE')}")
# Usage
show_phoenix_extensions(conn, 'MY_SCHEMA', 'MY_TABLE')Install with Tessl CLI
npx tessl i tessl/pypi-phoenixdb