Python driver for Apache Cassandra with comprehensive CQL support, connection pooling, and ORM capabilities
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Cluster metadata access, schema introspection, and topology information with complete keyspace, table, and column metadata. The metadata system provides comprehensive access to cluster topology and schema information.
Primary metadata container providing access to cluster topology and schema information.
class Metadata:
def __init__(self):
"""Container for cluster metadata including keyspaces, tables, and hosts."""
def get_keyspace(self, keyspace):
"""
Get metadata for a specific keyspace.
Parameters:
- keyspace (str): Name of the keyspace
Returns:
KeyspaceMetadata: Keyspace metadata or None if not found
"""
def get_table(self, keyspace, table):
"""
Get metadata for a specific table.
Parameters:
- keyspace (str): Name of the keyspace
- table (str): Name of the table
Returns:
TableMetadata: Table metadata or None if not found
"""
def get_user_type(self, keyspace, user_type):
"""
Get metadata for a user-defined type.
Parameters:
- keyspace (str): Name of the keyspace
- user_type (str): Name of the user-defined type
Returns:
UserType: User-defined type metadata or None if not found
"""
def get_function(self, keyspace, function, signature):
"""
Get metadata for a user-defined function.
Parameters:
- keyspace (str): Name of the keyspace
- function (str): Name of the function
- signature (list): Function signature (argument types)
Returns:
Function: Function metadata or None if not found
"""
def get_aggregate(self, keyspace, aggregate, signature):
"""
Get metadata for a user-defined aggregate.
Parameters:
- keyspace (str): Name of the keyspace
- aggregate (str): Name of the aggregate
- signature (list): Aggregate signature (argument types)
Returns:
Aggregate: Aggregate metadata or None if not found
"""
def get_host(self, address):
"""
Get host metadata by address.
Parameters:
- address (str): Host IP address
Returns:
Host: Host metadata or None if not found
"""
def all_hosts(self):
"""
Get all known hosts in the cluster.
Returns:
list: List of Host objects
"""
def rebuild_schema(self, keyspace=None):
"""
Rebuild schema metadata from the cluster.
Parameters:
- keyspace (str): Specific keyspace to rebuild, or None for all
"""
@property
def keyspaces(self):
"""dict: Dictionary mapping keyspace names to KeyspaceMetadata objects"""
@property
def cluster_name(self):
"""str: Name of the cluster"""
@property
def partitioner(self):
"""str: Partitioner used by the cluster"""
@property
def token_map(self):
"""TokenMap: Token mapping for the cluster"""
@property
def hosts(self):
"""dict: Dictionary mapping host addresses to Host objects"""Metadata for Cassandra keyspaces including replication configuration and contained objects.
class KeyspaceMetadata:
def __init__(self, name, durable_writes, strategy_class, strategy_options):
"""
Metadata for a Cassandra keyspace.
Parameters:
- name (str): Keyspace name
- durable_writes (bool): Whether durable writes are enabled
- strategy_class (str): Replication strategy class
- strategy_options (dict): Replication strategy options
"""
@property
def name(self):
"""str: Name of the keyspace"""
@property
def durable_writes(self):
"""bool: Whether durable writes are enabled"""
@property
def replication_strategy(self):
"""ReplicationStrategy: Replication strategy instance"""
@property
def tables(self):
"""dict: Dictionary mapping table names to TableMetadata objects"""
@property
def user_types(self):
"""dict: Dictionary mapping type names to UserType objects"""
@property
def functions(self):
"""dict: Dictionary mapping function signatures to Function objects"""
@property
def aggregates(self):
"""dict: Dictionary mapping aggregate signatures to Aggregate objects"""
def export_as_string(self):
"""
Export keyspace definition as CQL string.
Returns:
str: CQL CREATE KEYSPACE statement
"""Comprehensive metadata for Cassandra tables including columns, indexes, and options.
class TableMetadata:
def __init__(self, keyspace, name, columns, partition_key, clustering_key, options, triggers, indexes):
"""
Metadata for a Cassandra table.
Parameters:
- keyspace (str): Keyspace name
- name (str): Table name
- columns (list): List of ColumnMetadata objects
- partition_key (list): Partition key columns
- clustering_key (list): Clustering key columns
- options (dict): Table options
- triggers (dict): Table triggers
- indexes (dict): Secondary indexes
"""
@property
def keyspace_name(self):
"""str: Name of the keyspace containing this table"""
@property
def name(self):
"""str: Name of the table"""
@property
def columns(self):
"""dict: Dictionary mapping column names to ColumnMetadata objects"""
@property
def partition_key(self):
"""list: List of ColumnMetadata objects forming the partition key"""
@property
def clustering_key(self):
"""list: List of ColumnMetadata objects forming the clustering key"""
@property
def primary_key(self):
"""list: Complete primary key (partition key + clustering key)"""
@property
def options(self):
"""dict: Table options (compaction, compression, etc.)"""
@property
def triggers(self):
"""dict: Dictionary mapping trigger names to TriggerMetadata objects"""
@property
def indexes(self):
"""dict: Dictionary mapping index names to IndexMetadata objects"""
def export_as_string(self):
"""
Export table definition as CQL string.
Returns:
str: CQL CREATE TABLE statement
"""
def is_cql_compatible(self):
"""
Check if table is compatible with CQL.
Returns:
bool: True if table can be used with CQL
"""Metadata for individual table columns including type and constraints.
class ColumnMetadata:
def __init__(self, table, name, cql_type, is_static=False, is_reversed=False):
"""
Metadata for a table column.
Parameters:
- table (TableMetadata): Parent table
- name (str): Column name
- cql_type: CQL type of the column
- is_static (bool): Whether column is static
- is_reversed (bool): Whether column has reversed order
"""
@property
def table(self):
"""TableMetadata: Parent table metadata"""
@property
def name(self):
"""str: Name of the column"""
@property
def cql_type(self):
"""_CassandraType: CQL type of the column"""
@property
def is_static(self):
"""bool: Whether this is a static column"""
@property
def is_reversed(self):
"""bool: Whether this column has reversed clustering order"""
@property
def is_partition_key(self):
"""bool: Whether this column is part of the partition key"""
@property
def is_clustering_key(self):
"""bool: Whether this column is part of the clustering key"""
@property
def is_primary_key(self):
"""bool: Whether this column is part of the primary key"""Metadata for secondary indexes on tables.
class IndexMetadata:
def __init__(self, table, name, kind, options):
"""
Metadata for a secondary index.
Parameters:
- table (TableMetadata): Parent table
- name (str): Index name
- kind (str): Index type/kind
- options (dict): Index options
"""
@property
def table(self):
"""TableMetadata: Parent table metadata"""
@property
def name(self):
"""str: Name of the index"""
@property
def kind(self):
"""str: Type of index (COMPOSITES, KEYS, CUSTOM)"""
@property
def options(self):
"""dict: Index configuration options"""
def export_as_string(self):
"""
Export index definition as CQL string.
Returns:
str: CQL CREATE INDEX statement
"""Metadata for user-defined composite types.
class UserType:
def __init__(self, keyspace, name, field_names, field_types):
"""
Metadata for a user-defined type.
Parameters:
- keyspace (str): Keyspace name
- name (str): Type name
- field_names (list): Field names
- field_types (list): Field types
"""
@property
def keyspace(self):
"""str: Keyspace containing this type"""
@property
def name(self):
"""str: Name of the type"""
@property
def field_names(self):
"""list: Names of fields in this type"""
@property
def field_types(self):
"""list: Types of fields in this type"""
def export_as_string(self):
"""
Export type definition as CQL string.
Returns:
str: CQL CREATE TYPE statement
"""Metadata for user-defined functions and aggregates.
class Function:
def __init__(self, keyspace, name, argument_names, argument_types, body, called_on_null_input, language, return_type):
"""
Metadata for a user-defined function.
Parameters:
- keyspace (str): Keyspace name
- name (str): Function name
- argument_names (list): Parameter names
- argument_types (list): Parameter types
- body (str): Function body code
- called_on_null_input (bool): Whether function is called on null input
- language (str): Implementation language
- return_type: Return type
"""
@property
def keyspace_name(self):
"""str: Keyspace containing this function"""
@property
def name(self):
"""str: Name of the function"""
@property
def argument_names(self):
"""list: Names of function parameters"""
@property
def argument_types(self):
"""list: Types of function parameters"""
@property
def signature(self):
"""str: Function signature string"""
@property
def body(self):
"""str: Function implementation code"""
@property
def called_on_null_input(self):
"""bool: Whether function is called when input is null"""
@property
def language(self):
"""str: Implementation language (java, javascript, etc.)"""
@property
def return_type(self):
"""_CassandraType: Return type of the function"""
class Aggregate:
def __init__(self, keyspace, name, argument_types, state_func, state_type, final_func, initial_condition, return_type):
"""
Metadata for a user-defined aggregate.
Parameters:
- keyspace (str): Keyspace name
- name (str): Aggregate name
- argument_types (list): Input types
- state_func (str): State function name
- state_type: State type
- final_func (str): Final function name
- initial_condition: Initial state value
- return_type: Return type
"""
@property
def keyspace_name(self):
"""str: Keyspace containing this aggregate"""
@property
def name(self):
"""str: Name of the aggregate"""
@property
def argument_types(self):
"""list: Types of aggregate input"""
@property
def signature(self):
"""str: Aggregate signature string"""
@property
def state_func(self):
"""str: Name of the state function"""
@property
def state_type(self):
"""_CassandraType: Type of the state value"""
@property
def final_func(self):
"""str: Name of the final function"""
@property
def initial_condition(self):
"""Initial state value"""
@property
def return_type(self):
"""_CassandraType: Return type of the aggregate"""Replication strategy implementations for keyspaces.
class ReplicationStrategy:
"""Base class for replication strategies."""
@property
def name(self):
"""str: Name of the replication strategy"""
@property
def options(self):
"""dict: Strategy configuration options"""
class SimpleStrategy(ReplicationStrategy):
def __init__(self, replication_factor):
"""
Simple replication strategy for single-datacenter clusters.
Parameters:
- replication_factor (int): Number of replicas
"""
@property
def replication_factor(self):
"""int: Number of replicas"""
class NetworkTopologyStrategy(ReplicationStrategy):
def __init__(self, dc_replication_factors):
"""
Network topology replication strategy for multi-datacenter clusters.
Parameters:
- dc_replication_factors (dict): Replication factors by datacenter
"""
@property
def dc_replication_factors(self):
"""dict: Replication factors by datacenter name"""
class LocalStrategy(ReplicationStrategy):
def __init__(self):
"""Local replication strategy (for system keyspaces)."""Token ring and routing information for the cluster.
class TokenMap:
def __init__(self, token_to_host_owner, tokens_to_host_owners, ring):
"""
Token mapping for cluster routing.
Parameters:
- token_to_host_owner (dict): Mapping of tokens to primary hosts
- tokens_to_host_owners (dict): Mapping of tokens to replica sets
- ring (list): Ordered list of tokens in the ring
"""
def get_replicas(self, keyspace, token):
"""
Get replica hosts for a token in a keyspace.
Parameters:
- keyspace (str): Keyspace name
- token: Token to look up
Returns:
set: Set of Host objects that are replicas for the token
"""
@property
def ring(self):
"""list: Ordered list of tokens in the cluster ring"""
class Token:
"""Base class for partition tokens."""
@property
def value(self):
"""Token value"""
class Murmur3Token(Token):
def __init__(self, value):
"""
Murmur3 hash token (default partitioner).
Parameters:
- value (int): Token value
"""
class MD5Token(Token):
def __init__(self, value):
"""
MD5 hash token (legacy partitioner).
Parameters:
- value (int): Token value
"""
class BytesToken(Token):
def __init__(self, value):
"""
Bytes-based token (byte order partitioner).
Parameters:
- value (bytes): Token value
"""Utility functions for working with CQL identifiers and values.
def protect_names(names):
"""
Quote CQL identifiers that need protection.
Parameters:
- names (list): List of CQL identifiers
Returns:
list: List of quoted identifiers
"""
def protect_name(name):
"""
Quote a CQL identifier if it needs protection.
Parameters:
- name (str): CQL identifier
Returns:
str: Quoted identifier if needed, otherwise original name
"""
def protect_value(value):
"""
Quote a CQL value for safe inclusion in queries.
Parameters:
- value: Value to quote
Returns:
str: Quoted value suitable for CQL
"""
def is_valid_name(name):
"""
Check if a name is a valid unquoted CQL identifier.
Parameters:
- name (str): Identifier to check
Returns:
bool: True if the name is valid unquoted
"""
def escape_name(name):
"""
Escape a CQL identifier for use in quoted form.
Parameters:
- name (str): Identifier to escape
Returns:
str: Escaped identifier
"""# Get cluster metadata
metadata = cluster.metadata
print(f"Cluster name: {metadata.cluster_name}")
print(f"Partitioner: {metadata.partitioner}")
print(f"Total hosts: {len(metadata.all_hosts())}")
# List all keyspaces
print("\nKeyspaces:")
for keyspace_name in metadata.keyspaces:
keyspace = metadata.keyspaces[keyspace_name]
print(f" {keyspace_name}: {keyspace.replication_strategy}")
# Explore a specific keyspace
keyspace = metadata.get_keyspace('my_app')
if keyspace:
print(f"\nKeyspace '{keyspace.name}':")
print(f" Durable writes: {keyspace.durable_writes}")
print(f" Tables: {list(keyspace.tables.keys())}")
print(f" User types: {list(keyspace.user_types.keys())}")
print(f" Functions: {len(keyspace.functions)}")# Get table metadata
table = metadata.get_table('my_app', 'users')
if table:
print(f"Table: {table.keyspace_name}.{table.name}")
# Show partition key
print(f"Partition key: {[col.name for col in table.partition_key]}")
# Show clustering key
if table.clustering_key:
print(f"Clustering key: {[col.name for col in table.clustering_key]}")
# Show all columns
print("\nColumns:")
for col_name, column in table.columns.items():
key_type = ""
if column.is_partition_key:
key_type = " (partition key)"
elif column.is_clustering_key:
key_type = " (clustering key)"
elif column.is_static:
key_type = " (static)"
print(f" {col_name}: {column.cql_type}{key_type}")
# Show indexes
if table.indexes:
print("\nIndexes:")
for index_name, index in table.indexes.items():
print(f" {index_name}: {index.kind}")
# Show table options
print(f"\nTable options: {table.options}")
# Export as CQL
print(f"\nCQL Definition:\n{table.export_as_string()}")# Get UDT metadata
address_type = metadata.get_user_type('my_app', 'address')
if address_type:
print(f"User type: {address_type.keyspace}.{address_type.name}")
print("Fields:")
for field_name, field_type in zip(address_type.field_names, address_type.field_types):
print(f" {field_name}: {field_type}")
print(f"\nCQL Definition:\n{address_type.export_as_string()}")
# Find tables using this UDT
print(f"\nTables using {address_type.name}:")
keyspace = metadata.get_keyspace('my_app')
for table_name, table in keyspace.tables.items():
for col_name, column in table.columns.items():
if hasattr(column.cql_type, 'typename') and column.cql_type.typename == 'address':
print(f" {table_name}.{col_name}")# Examine hosts and datacenters
print("Cluster topology:")
hosts_by_dc = {}
for host in metadata.all_hosts():
dc = host.datacenter or 'unknown'
if dc not in hosts_by_dc:
hosts_by_dc[dc] = []
hosts_by_dc[dc].append(host)
for dc, hosts in hosts_by_dc.items():
print(f"\nDatacenter: {dc}")
for host in hosts:
status = "UP" if host.is_up else "DOWN"
print(f" {host.address} ({host.rack}): {status} - {host.release_version}")
# Examine token distribution
token_map = metadata.token_map
if token_map:
print(f"\nToken ring has {len(token_map.ring)} tokens")
# Show token ownership for a keyspace
keyspace_name = 'my_app'
if keyspace_name in metadata.keyspaces:
print(f"\nReplica distribution for keyspace '{keyspace_name}':")
sample_tokens = token_map.ring[:5] # Sample first 5 tokens
for token in sample_tokens:
replicas = token_map.get_replicas(keyspace_name, token)
replica_addresses = [host.address for host in replicas]
print(f" Token {token.value}: {replica_addresses}")def compare_schemas(old_metadata, new_metadata, keyspace_name):
"""Compare two metadata snapshots to detect schema changes."""
old_ks = old_metadata.get_keyspace(keyspace_name)
new_ks = new_metadata.get_keyspace(keyspace_name)
if not old_ks or not new_ks:
print("Keyspace not found in one of the metadata snapshots")
return
# Compare tables
old_tables = set(old_ks.tables.keys())
new_tables = set(new_ks.tables.keys())
added_tables = new_tables - old_tables
removed_tables = old_tables - new_tables
common_tables = old_tables & new_tables
if added_tables:
print(f"Added tables: {added_tables}")
if removed_tables:
print(f"Removed tables: {removed_tables}")
# Compare columns in common tables
for table_name in common_tables:
old_table = old_ks.tables[table_name]
new_table = new_ks.tables[table_name]
old_columns = set(old_table.columns.keys())
new_columns = set(new_table.columns.keys())
added_columns = new_columns - old_columns
removed_columns = old_columns - new_columns
if added_columns:
print(f"Table {table_name} - Added columns: {added_columns}")
if removed_columns:
print(f"Table {table_name} - Removed columns: {removed_columns}")
# Usage
old_metadata = cluster.metadata
# ... time passes, schema changes occur ...
cluster.metadata.rebuild_schema()
new_metadata = cluster.metadata
compare_schemas(old_metadata, new_metadata, 'my_app')from cassandra.metadata import protect_name, protect_value
def generate_insert_query(table_metadata, data):
"""Generate INSERT query from table metadata and data."""
table_name = f"{table_metadata.keyspace_name}.{protect_name(table_metadata.name)}"
# Filter data to only include existing columns
valid_columns = []
valid_values = []
placeholders = []
for col_name, value in data.items():
if col_name in table_metadata.columns:
valid_columns.append(protect_name(col_name))
valid_values.append(value)
placeholders.append('?')
if not valid_columns:
raise ValueError("No valid columns found in data")
query = f"""
INSERT INTO {table_name} ({', '.join(valid_columns)})
VALUES ({', '.join(placeholders)})
"""
return query.strip(), valid_values
# Usage
table = metadata.get_table('my_app', 'users')
data = {
'id': uuid.uuid4(),
'name': 'Alice Smith',
'email': 'alice@example.com',
'invalid_column': 'ignored' # This will be filtered out
}
query, values = generate_insert_query(table, data)
print(f"Generated query: {query}")
print(f"Values: {values}")
session.execute(query, values)def analyze_keyspace_complexity(keyspace_metadata):
"""Analyze complexity metrics for a keyspace."""
metrics = {
'total_tables': len(keyspace_metadata.tables),
'total_columns': 0,
'total_indexes': 0,
'tables_with_clustering': 0,
'tables_with_static_columns': 0,
'user_types': len(keyspace_metadata.user_types),
'functions': len(keyspace_metadata.functions),
'column_types': set()
}
for table in keyspace_metadata.tables.values():
metrics['total_columns'] += len(table.columns)
metrics['total_indexes'] += len(table.indexes)
if table.clustering_key:
metrics['tables_with_clustering'] += 1
has_static = any(col.is_static for col in table.columns.values())
if has_static:
metrics['tables_with_static_columns'] += 1
for column in table.columns.values():
metrics['column_types'].add(type(column.cql_type).__name__)
return metrics
# Usage
keyspace = metadata.get_keyspace('my_app')
if keyspace:
complexity = analyze_keyspace_complexity(keyspace)
print(f"Keyspace complexity analysis:")
for metric, value in complexity.items():
if metric == 'column_types':
print(f" {metric}: {sorted(value)}")
else:
print(f" {metric}: {value}")Install with Tessl CLI
npx tessl i tessl/pypi-cassandra-driver