Amazon Redshift connector for Python implementing Python Database API Specification 2.0
—
Database schema introspection capabilities for retrieving metadata about tables, columns, procedures, and other database objects. The redshift_connector provides comprehensive metadata operations that enable applications to dynamically discover and work with database schemas.
Retrieve information about tables, views, and other relations in the database with optional filtering by catalog, schema, and table name patterns.
class Cursor:
def get_tables(
self,
catalog: str = None,
schema: str = None,
table: str = None,
types: list[str] = None
) -> tuple:
"""
Retrieve table metadata from the database.
Parameters:
- catalog: Catalog name pattern (None for all catalogs)
- schema: Schema name pattern (None for all schemas)
- table: Table name pattern (None for all tables)
- types: List of table types to include (e.g., ['TABLE', 'VIEW'])
Returns:
Tuple of tuples, each containing:
(table_cat, table_schem, table_name, table_type, remarks,
type_cat, type_schem, type_name, self_referencing_col_name,
ref_generation)
Common table types: 'TABLE', 'VIEW', 'SYSTEM TABLE', 'GLOBAL TEMPORARY',
'LOCAL TEMPORARY', 'ALIAS', 'SYNONYM'
"""Detailed information about table columns including data types, nullability, defaults, and constraints.
class Cursor:
def get_columns(
self,
catalog: str = None,
schema: str = None,
table: str = None,
column: str = None
) -> tuple:
"""
Retrieve column metadata from database tables.
Parameters:
- catalog: Catalog name pattern (None for all catalogs)
- schema: Schema name pattern (None for all schemas)
- table: Table name pattern (None for all tables)
- column: Column name pattern (None for all columns)
Returns:
Tuple of tuples, each containing:
(table_cat, table_schem, table_name, column_name, data_type,
type_name, column_size, buffer_length, decimal_digits, num_prec_radix,
nullable, remarks, column_def, sql_data_type, sql_datetime_sub,
char_octet_length, ordinal_position, is_nullable, scope_catalog,
scope_schema, scope_table, source_data_type, is_autoincrement, is_generatedcolumn)
"""Retrieve primary key constraints and their constituent columns for tables.
class Cursor:
def get_primary_keys(
self,
catalog: str = None,
schema: str = None,
table: str = None
) -> tuple:
"""
Retrieve primary key information for tables.
Parameters:
- catalog: Catalog name (None for current catalog)
- schema: Schema name (None for current schema)
- table: Table name (None for all tables)
Returns:
Tuple of tuples, each containing:
(table_cat, table_schem, table_name, column_name, key_seq, pk_name)
Results are ordered by table_cat, table_schem, table_name, key_seq
"""Information about stored procedures and functions available in the database.
class Cursor:
def get_procedures(
self,
catalog: str = None,
schema: str = None,
procedure: str = None
) -> tuple:
"""
Retrieve stored procedure metadata.
Parameters:
- catalog: Catalog name pattern (None for all catalogs)
- schema: Schema name pattern (None for all schemas)
- procedure: Procedure name pattern (None for all procedures)
Returns:
Tuple of tuples, each containing:
(procedure_cat, procedure_schem, procedure_name, num_input_params,
num_output_params, num_result_sets, remarks, procedure_type,
specific_name)
"""Retrieve information about available schemas (namespaces) in the database.
class Cursor:
def get_schemas(
self,
catalog: str = None,
schema: str = None
) -> tuple:
"""
Retrieve schema metadata.
Parameters:
- catalog: Catalog name (None for current catalog)
- schema: Schema name pattern (None for all schemas)
Returns:
Tuple of tuples, each containing:
(table_schem, table_catalog)
"""Retrieve information about available catalogs (databases) accessible to the current connection.
class Cursor:
def get_catalogs(self) -> tuple:
"""
Retrieve catalog (database) metadata.
Returns:
Tuple of tuples, each containing:
(table_cat,)
Lists all catalogs (databases) available to the current user.
"""Practical examples demonstrating metadata introspection for common use cases.
import redshift_connector
conn = redshift_connector.connect(
host='examplecluster.abc123xyz789.us-west-1.redshift.amazonaws.com',
database='dev',
user='awsuser',
password='password'
)
cursor = conn.cursor()
# List all tables in the current database
tables = cursor.get_tables()
print("Available tables:")
for table in tables:
catalog, schema, table_name, table_type = table[:4]
print(f" {schema}.{table_name} ({table_type})")
# List tables in a specific schema
sales_tables = cursor.get_tables(schema='sales')
print("\nTables in 'sales' schema:")
for table in sales_tables:
print(f" {table[2]} ({table[3]})") # table_name, table_type
# Get detailed column information for a specific table
columns = cursor.get_columns(schema='sales', table='orders')
print("\nColumns in 'sales.orders' table:")
for col in columns:
schema_name, table_name, col_name, data_type, type_name, col_size, nullable = col[1], col[2], col[3], col[4], col[5], col[6], col[10]
null_str = "NULL" if nullable else "NOT NULL"
print(f" {col_name}: {type_name}({col_size}) {null_str}")
# Find primary keys for a table
pk_info = cursor.get_primary_keys(schema='sales', table='orders')
if pk_info:
print(f"\nPrimary key for 'sales.orders':")
pk_columns = [col[3] for col in sorted(pk_info, key=lambda x: x[4])] # Sort by key_seq
print(f" Columns: {', '.join(pk_columns)}")
print(f" Constraint name: {pk_info[0][5]}")
# List all schemas
schemas = cursor.get_schemas()
print(f"\nAvailable schemas:")
for schema in schemas:
print(f" {schema[0]}")
# List all catalogs (databases)
catalogs = cursor.get_catalogs()
print(f"\nAvailable catalogs:")
for catalog in catalogs:
print(f" {catalog[0]}")
# Find tables matching a pattern
user_tables = cursor.get_tables(table='user%') # Tables starting with 'user'
print(f"\nTables matching 'user%' pattern:")
for table in user_tables:
print(f" {table[1]}.{table[2]}")
# Get stored procedures
procedures = cursor.get_procedures()
if procedures:
print(f"\nAvailable procedures:")
for proc in procedures:
schema, proc_name, proc_type = proc[1], proc[2], proc[7]
print(f" {schema}.{proc_name} (type: {proc_type})")
else:
print("\nNo stored procedures found")
cursor.close()
conn.close()Advanced patterns for dynamically discovering and working with database schemas.
def discover_table_structure(cursor, schema_name, table_name):
"""
Discover complete table structure including columns, primary keys, and metadata.
"""
# Get table information
tables = cursor.get_tables(schema=schema_name, table=table_name)
if not tables:
raise ValueError(f"Table {schema_name}.{table_name} not found")
table_info = tables[0]
table_type = table_info[3]
# Get column information
columns = cursor.get_columns(schema=schema_name, table=table_name)
column_info = []
for col in columns:
column_info.append({
'name': col[3],
'data_type': col[4],
'type_name': col[5],
'column_size': col[6],
'decimal_digits': col[8],
'nullable': col[10] == 1,
'default': col[12],
'ordinal_position': col[16],
'is_autoincrement': col[22] == 'YES',
})
# Get primary key information
pk_cols = cursor.get_primary_keys(schema=schema_name, table=table_name)
primary_key = []
if pk_cols:
primary_key = [col[3] for col in sorted(pk_cols, key=lambda x: x[4])]
return {
'schema': schema_name,
'table': table_name,
'table_type': table_type,
'columns': column_info,
'primary_key': primary_key
}
def generate_create_table_ddl(cursor, schema_name, table_name):
"""
Generate CREATE TABLE DDL statement based on table metadata.
"""
structure = discover_table_structure(cursor, schema_name, table_name)
ddl = f"CREATE TABLE {schema_name}.{table_name} (\n"
column_defs = []
for col in structure['columns']:
col_def = f" {col['name']} {col['type_name']}"
if col['column_size']:
col_def += f"({col['column_size']}"
if col['decimal_digits']:
col_def += f", {col['decimal_digits']}"
col_def += ")"
if not col['nullable']:
col_def += " NOT NULL"
if col['default']:
col_def += f" DEFAULT {col['default']}"
column_defs.append(col_def)
ddl += ",\n".join(column_defs)
if structure['primary_key']:
pk_def = f" PRIMARY KEY ({', '.join(structure['primary_key'])})"
ddl += f",\n{pk_def}"
ddl += "\n);"
return ddl
# Usage example
cursor = conn.cursor()
try:
# Discover table structure
structure = discover_table_structure(cursor, 'public', 'users')
print(f"Table: {structure['schema']}.{structure['table']}")
print(f"Type: {structure['table_type']}")
print(f"Columns: {len(structure['columns'])}")
print(f"Primary Key: {structure['primary_key']}")
# Generate DDL
ddl = generate_create_table_ddl(cursor, 'public', 'users')
print(f"\nGenerated DDL:\n{ddl}")
except ValueError as e:
print(f"Error: {e}")Working with metadata across multiple databases and schemas in Redshift.
# Database metadata scope configuration
conn = redshift_connector.connect(
# ... connection parameters
database_metadata_current_db_only=True # Limit metadata to current database (default)
# database_metadata_current_db_only=False # Include cross-database metadata (datashare support)
)
def list_all_databases_and_schemas(cursor):
"""
List all accessible databases and their schemas.
"""
catalogs = cursor.get_catalogs()
for catalog in catalogs:
catalog_name = catalog[0]
print(f"Database: {catalog_name}")
# Get schemas in this catalog
schemas = cursor.get_schemas(catalog=catalog_name)
for schema in schemas:
schema_name = schema[0]
print(f" Schema: {schema_name}")
# Get table count in schema
tables = cursor.get_tables(catalog=catalog_name, schema=schema_name)
print(f" Tables: {len(tables)}")
def find_tables_by_column(cursor, column_name):
"""
Find all tables that contain a specific column name.
"""
columns = cursor.get_columns(column=column_name)
tables_with_column = set()
for col in columns:
table_key = (col[0], col[1], col[2]) # catalog, schema, table
tables_with_column.add(table_key)
return list(tables_with_column)
# Example usage
cursor = conn.cursor()
# Find all tables with a 'user_id' column
tables_with_user_id = find_tables_by_column(cursor, 'user_id')
print("Tables containing 'user_id' column:")
for catalog, schema, table in tables_with_user_id:
print(f" {catalog}.{schema}.{table}")Strategies for efficient metadata operations and caching for better performance.
class MetadataCache:
"""Simple metadata cache for improved performance."""
def __init__(self, cursor):
self.cursor = cursor
self._table_cache = {}
self._column_cache = {}
def get_table_info(self, schema, table):
"""Get cached table information."""
key = (schema, table)
if key not in self._table_cache:
tables = self.cursor.get_tables(schema=schema, table=table)
self._table_cache[key] = tables[0] if tables else None
return self._table_cache[key]
def get_column_info(self, schema, table):
"""Get cached column information."""
key = (schema, table)
if key not in self._column_cache:
columns = self.cursor.get_columns(schema=schema, table=table)
self._column_cache[key] = columns
return self._column_cache[key]
def clear_cache(self):
"""Clear the metadata cache."""
self._table_cache.clear()
self._column_cache.clear()
# Usage example
cursor = conn.cursor()
metadata_cache = MetadataCache(cursor)
# First access - queries database
table_info = metadata_cache.get_table_info('public', 'users')
columns = metadata_cache.get_column_info('public', 'users')
# Subsequent accesses - uses cache
table_info2 = metadata_cache.get_table_info('public', 'users') # From cache
columns2 = metadata_cache.get_column_info('public', 'users') # From cacheInstall with Tessl CLI
npx tessl i tessl/pypi-redshift-connector