CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-cassandra-driver

Python driver for Apache Cassandra with comprehensive CQL support, connection pooling, and ORM capabilities

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

metadata.mddocs/

Metadata & Schema

Cluster metadata access, schema introspection, and topology information with complete keyspace, table, and column metadata. The metadata system provides comprehensive access to cluster topology and schema information.

Capabilities

Cluster Metadata

Primary metadata container providing access to cluster topology and schema information.

class Metadata:
    def __init__(self):
        """Container for cluster metadata including keyspaces, tables, and hosts."""

    def get_keyspace(self, keyspace):
        """
        Get metadata for a specific keyspace.

        Parameters:
        - keyspace (str): Name of the keyspace

        Returns:
        KeyspaceMetadata: Keyspace metadata or None if not found
        """

    def get_table(self, keyspace, table):
        """
        Get metadata for a specific table.

        Parameters:
        - keyspace (str): Name of the keyspace
        - table (str): Name of the table

        Returns:
        TableMetadata: Table metadata or None if not found
        """

    def get_user_type(self, keyspace, user_type):
        """
        Get metadata for a user-defined type.

        Parameters:
        - keyspace (str): Name of the keyspace
        - user_type (str): Name of the user-defined type

        Returns:
        UserType: User-defined type metadata or None if not found
        """

    def get_function(self, keyspace, function, signature):
        """
        Get metadata for a user-defined function.

        Parameters:
        - keyspace (str): Name of the keyspace
        - function (str): Name of the function
        - signature (list): Function signature (argument types)

        Returns:
        Function: Function metadata or None if not found
        """

    def get_aggregate(self, keyspace, aggregate, signature):
        """
        Get metadata for a user-defined aggregate.

        Parameters:
        - keyspace (str): Name of the keyspace
        - aggregate (str): Name of the aggregate
        - signature (list): Aggregate signature (argument types)

        Returns:
        Aggregate: Aggregate metadata or None if not found
        """

    def get_host(self, address):
        """
        Get host metadata by address.

        Parameters:
        - address (str): Host IP address

        Returns:
        Host: Host metadata or None if not found
        """

    def all_hosts(self):
        """
        Get all known hosts in the cluster.

        Returns:
        list: List of Host objects
        """

    def rebuild_schema(self, keyspace=None):
        """
        Rebuild schema metadata from the cluster.

        Parameters:
        - keyspace (str): Specific keyspace to rebuild, or None for all
        """

    @property
    def keyspaces(self):
        """dict: Dictionary mapping keyspace names to KeyspaceMetadata objects"""

    @property
    def cluster_name(self):
        """str: Name of the cluster"""

    @property
    def partitioner(self):
        """str: Partitioner used by the cluster"""

    @property
    def token_map(self):
        """TokenMap: Token mapping for the cluster"""

    @property
    def hosts(self):
        """dict: Dictionary mapping host addresses to Host objects"""

Keyspace Metadata

Metadata for Cassandra keyspaces including replication configuration and contained objects.

class KeyspaceMetadata:
    def __init__(self, name, durable_writes, strategy_class, strategy_options):
        """
        Metadata for a Cassandra keyspace.

        Parameters:
        - name (str): Keyspace name
        - durable_writes (bool): Whether durable writes are enabled
        - strategy_class (str): Replication strategy class
        - strategy_options (dict): Replication strategy options
        """

    @property
    def name(self):
        """str: Name of the keyspace"""

    @property
    def durable_writes(self):
        """bool: Whether durable writes are enabled"""

    @property
    def replication_strategy(self):
        """ReplicationStrategy: Replication strategy instance"""

    @property
    def tables(self):
        """dict: Dictionary mapping table names to TableMetadata objects"""

    @property
    def user_types(self):
        """dict: Dictionary mapping type names to UserType objects"""

    @property
    def functions(self):
        """dict: Dictionary mapping function signatures to Function objects"""

    @property
    def aggregates(self):
        """dict: Dictionary mapping aggregate signatures to Aggregate objects"""

    def export_as_string(self):
        """
        Export keyspace definition as CQL string.

        Returns:
        str: CQL CREATE KEYSPACE statement
        """

Table Metadata

Comprehensive metadata for Cassandra tables including columns, indexes, and options.

class TableMetadata:
    def __init__(self, keyspace, name, columns, partition_key, clustering_key, options, triggers, indexes):
        """
        Metadata for a Cassandra table.

        Parameters:
        - keyspace (str): Keyspace name
        - name (str): Table name
        - columns (list): List of ColumnMetadata objects
        - partition_key (list): Partition key columns
        - clustering_key (list): Clustering key columns
        - options (dict): Table options
        - triggers (dict): Table triggers
        - indexes (dict): Secondary indexes
        """

    @property
    def keyspace_name(self):
        """str: Name of the keyspace containing this table"""

    @property
    def name(self):
        """str: Name of the table"""

    @property
    def columns(self):
        """dict: Dictionary mapping column names to ColumnMetadata objects"""

    @property
    def partition_key(self):
        """list: List of ColumnMetadata objects forming the partition key"""

    @property
    def clustering_key(self):
        """list: List of ColumnMetadata objects forming the clustering key"""

    @property
    def primary_key(self):
        """list: Complete primary key (partition key + clustering key)"""

    @property
    def options(self):
        """dict: Table options (compaction, compression, etc.)"""

    @property
    def triggers(self):
        """dict: Dictionary mapping trigger names to TriggerMetadata objects"""

    @property
    def indexes(self):
        """dict: Dictionary mapping index names to IndexMetadata objects"""

    def export_as_string(self):
        """
        Export table definition as CQL string.

        Returns:
        str: CQL CREATE TABLE statement
        """

    def is_cql_compatible(self):
        """
        Check if table is compatible with CQL.

        Returns:
        bool: True if table can be used with CQL
        """

Column Metadata

Metadata for individual table columns including type and constraints.

class ColumnMetadata:
    def __init__(self, table, name, cql_type, is_static=False, is_reversed=False):
        """
        Metadata for a table column.

        Parameters:
        - table (TableMetadata): Parent table
        - name (str): Column name
        - cql_type: CQL type of the column
        - is_static (bool): Whether column is static
        - is_reversed (bool): Whether column has reversed order
        """

    @property
    def table(self):
        """TableMetadata: Parent table metadata"""

    @property
    def name(self):
        """str: Name of the column"""

    @property
    def cql_type(self):
        """_CassandraType: CQL type of the column"""

    @property
    def is_static(self):
        """bool: Whether this is a static column"""

    @property
    def is_reversed(self):
        """bool: Whether this column has reversed clustering order"""

    @property
    def is_partition_key(self):
        """bool: Whether this column is part of the partition key"""

    @property
    def is_clustering_key(self):
        """bool: Whether this column is part of the clustering key"""

    @property
    def is_primary_key(self):
        """bool: Whether this column is part of the primary key"""

Index Metadata

Metadata for secondary indexes on tables.

class IndexMetadata:
    def __init__(self, table, name, kind, options):
        """
        Metadata for a secondary index.

        Parameters:
        - table (TableMetadata): Parent table
        - name (str): Index name
        - kind (str): Index type/kind
        - options (dict): Index options
        """

    @property
    def table(self):
        """TableMetadata: Parent table metadata"""

    @property
    def name(self):
        """str: Name of the index"""

    @property
    def kind(self):
        """str: Type of index (COMPOSITES, KEYS, CUSTOM)"""

    @property
    def options(self):
        """dict: Index configuration options"""

    def export_as_string(self):
        """
        Export index definition as CQL string.

        Returns:
        str: CQL CREATE INDEX statement
        """

User-Defined Types

Metadata for user-defined composite types.

class UserType:
    def __init__(self, keyspace, name, field_names, field_types):
        """
        Metadata for a user-defined type.

        Parameters:
        - keyspace (str): Keyspace name
        - name (str): Type name
        - field_names (list): Field names
        - field_types (list): Field types
        """

    @property
    def keyspace(self):
        """str: Keyspace containing this type"""

    @property
    def name(self):
        """str: Name of the type"""

    @property
    def field_names(self):
        """list: Names of fields in this type"""

    @property
    def field_types(self):
        """list: Types of fields in this type"""

    def export_as_string(self):
        """
        Export type definition as CQL string.

        Returns:
        str: CQL CREATE TYPE statement
        """

User-Defined Functions

Metadata for user-defined functions and aggregates.

class Function:
    def __init__(self, keyspace, name, argument_names, argument_types, body, called_on_null_input, language, return_type):
        """
        Metadata for a user-defined function.

        Parameters:
        - keyspace (str): Keyspace name
        - name (str): Function name
        - argument_names (list): Parameter names
        - argument_types (list): Parameter types
        - body (str): Function body code
        - called_on_null_input (bool): Whether function is called on null input
        - language (str): Implementation language
        - return_type: Return type
        """

    @property
    def keyspace_name(self):
        """str: Keyspace containing this function"""

    @property
    def name(self):
        """str: Name of the function"""

    @property
    def argument_names(self):
        """list: Names of function parameters"""

    @property
    def argument_types(self):
        """list: Types of function parameters"""

    @property
    def signature(self):
        """str: Function signature string"""

    @property
    def body(self):
        """str: Function implementation code"""

    @property
    def called_on_null_input(self):
        """bool: Whether function is called when input is null"""

    @property
    def language(self):
        """str: Implementation language (java, javascript, etc.)"""

    @property
    def return_type(self):
        """_CassandraType: Return type of the function"""

class Aggregate:
    def __init__(self, keyspace, name, argument_types, state_func, state_type, final_func, initial_condition, return_type):
        """
        Metadata for a user-defined aggregate.

        Parameters:
        - keyspace (str): Keyspace name
        - name (str): Aggregate name
        - argument_types (list): Input types
        - state_func (str): State function name
        - state_type: State type
        - final_func (str): Final function name
        - initial_condition: Initial state value
        - return_type: Return type
        """

    @property
    def keyspace_name(self):
        """str: Keyspace containing this aggregate"""

    @property
    def name(self):
        """str: Name of the aggregate"""

    @property
    def argument_types(self):
        """list: Types of aggregate input"""

    @property
    def signature(self):
        """str: Aggregate signature string"""

    @property
    def state_func(self):
        """str: Name of the state function"""

    @property
    def state_type(self):
        """_CassandraType: Type of the state value"""

    @property
    def final_func(self):
        """str: Name of the final function"""

    @property
    def initial_condition(self):
        """Initial state value"""

    @property
    def return_type(self):
        """_CassandraType: Return type of the aggregate"""

Replication Strategies

Replication strategy implementations for keyspaces.

class ReplicationStrategy:
    """Base class for replication strategies."""

    @property
    def name(self):
        """str: Name of the replication strategy"""

    @property
    def options(self):
        """dict: Strategy configuration options"""

class SimpleStrategy(ReplicationStrategy):
    def __init__(self, replication_factor):
        """
        Simple replication strategy for single-datacenter clusters.

        Parameters:
        - replication_factor (int): Number of replicas
        """

    @property
    def replication_factor(self):
        """int: Number of replicas"""

class NetworkTopologyStrategy(ReplicationStrategy):
    def __init__(self, dc_replication_factors):
        """
        Network topology replication strategy for multi-datacenter clusters.

        Parameters:
        - dc_replication_factors (dict): Replication factors by datacenter
        """

    @property
    def dc_replication_factors(self):
        """dict: Replication factors by datacenter name"""

class LocalStrategy(ReplicationStrategy):
    def __init__(self):
        """Local replication strategy (for system keyspaces)."""

Token Management

Token ring and routing information for the cluster.

class TokenMap:
    def __init__(self, token_to_host_owner, tokens_to_host_owners, ring):
        """
        Token mapping for cluster routing.

        Parameters:
        - token_to_host_owner (dict): Mapping of tokens to primary hosts
        - tokens_to_host_owners (dict): Mapping of tokens to replica sets
        - ring (list): Ordered list of tokens in the ring
        """

    def get_replicas(self, keyspace, token):
        """
        Get replica hosts for a token in a keyspace.

        Parameters:
        - keyspace (str): Keyspace name
        - token: Token to look up

        Returns:
        set: Set of Host objects that are replicas for the token
        """

    @property
    def ring(self):
        """list: Ordered list of tokens in the cluster ring"""

class Token:
    """Base class for partition tokens."""

    @property
    def value(self):
        """Token value"""

class Murmur3Token(Token):
    def __init__(self, value):
        """
        Murmur3 hash token (default partitioner).

        Parameters:
        - value (int): Token value
        """

class MD5Token(Token):
    def __init__(self, value):
        """
        MD5 hash token (legacy partitioner).

        Parameters:
        - value (int): Token value
        """

class BytesToken(Token):
    def __init__(self, value):
        """
        Bytes-based token (byte order partitioner).

        Parameters:
        - value (bytes): Token value
        """

Metadata Utilities

Utility functions for working with CQL identifiers and values.

def protect_names(names):
    """
    Quote CQL identifiers that need protection.

    Parameters:
    - names (list): List of CQL identifiers

    Returns:
    list: List of quoted identifiers
    """

def protect_name(name):
    """
    Quote a CQL identifier if it needs protection.

    Parameters:
    - name (str): CQL identifier

    Returns:
    str: Quoted identifier if needed, otherwise original name
    """

def protect_value(value):
    """
    Quote a CQL value for safe inclusion in queries.

    Parameters:
    - value: Value to quote

    Returns:
    str: Quoted value suitable for CQL
    """

def is_valid_name(name):
    """
    Check if a name is a valid unquoted CQL identifier.

    Parameters:
    - name (str): Identifier to check

    Returns:
    bool: True if the name is valid unquoted
    """

def escape_name(name):
    """
    Escape a CQL identifier for use in quoted form.

    Parameters:
    - name (str): Identifier to escape

    Returns:
    str: Escaped identifier
    """

Usage Examples

Exploring Cluster Schema

# Get cluster metadata
metadata = cluster.metadata

print(f"Cluster name: {metadata.cluster_name}")
print(f"Partitioner: {metadata.partitioner}")
print(f"Total hosts: {len(metadata.all_hosts())}")

# List all keyspaces
print("\nKeyspaces:")
for keyspace_name in metadata.keyspaces:
    keyspace = metadata.keyspaces[keyspace_name]
    print(f"  {keyspace_name}: {keyspace.replication_strategy}")

# Explore a specific keyspace
keyspace = metadata.get_keyspace('my_app')
if keyspace:
    print(f"\nKeyspace '{keyspace.name}':")
    print(f"  Durable writes: {keyspace.durable_writes}")
    print(f"  Tables: {list(keyspace.tables.keys())}")
    print(f"  User types: {list(keyspace.user_types.keys())}")
    print(f"  Functions: {len(keyspace.functions)}")

Examining Table Structure

# Get table metadata
table = metadata.get_table('my_app', 'users')
if table:
    print(f"Table: {table.keyspace_name}.{table.name}")
    
    # Show partition key
    print(f"Partition key: {[col.name for col in table.partition_key]}")
    
    # Show clustering key
    if table.clustering_key:
        print(f"Clustering key: {[col.name for col in table.clustering_key]}")
    
    # Show all columns
    print("\nColumns:")
    for col_name, column in table.columns.items():
        key_type = ""
        if column.is_partition_key:
            key_type = " (partition key)"
        elif column.is_clustering_key:
            key_type = " (clustering key)"
        elif column.is_static:
            key_type = " (static)"
        
        print(f"  {col_name}: {column.cql_type}{key_type}")
    
    # Show indexes
    if table.indexes:
        print("\nIndexes:")
        for index_name, index in table.indexes.items():
            print(f"  {index_name}: {index.kind}")
    
    # Show table options
    print(f"\nTable options: {table.options}")
    
    # Export as CQL
    print(f"\nCQL Definition:\n{table.export_as_string()}")

Working with User-Defined Types

# Get UDT metadata
address_type = metadata.get_user_type('my_app', 'address')
if address_type:
    print(f"User type: {address_type.keyspace}.{address_type.name}")
    print("Fields:")
    for field_name, field_type in zip(address_type.field_names, address_type.field_types):
        print(f"  {field_name}: {field_type}")
    
    print(f"\nCQL Definition:\n{address_type.export_as_string()}")

# Find tables using this UDT
print(f"\nTables using {address_type.name}:")
keyspace = metadata.get_keyspace('my_app')
for table_name, table in keyspace.tables.items():
    for col_name, column in table.columns.items():
        if hasattr(column.cql_type, 'typename') and column.cql_type.typename == 'address':
            print(f"  {table_name}.{col_name}")

Analyzing Cluster Topology

# Examine hosts and datacenters
print("Cluster topology:")
hosts_by_dc = {}
for host in metadata.all_hosts():
    dc = host.datacenter or 'unknown'
    if dc not in hosts_by_dc:
        hosts_by_dc[dc] = []
    hosts_by_dc[dc].append(host)

for dc, hosts in hosts_by_dc.items():
    print(f"\nDatacenter: {dc}")
    for host in hosts:
        status = "UP" if host.is_up else "DOWN"
        print(f"  {host.address} ({host.rack}): {status} - {host.release_version}")

# Examine token distribution
token_map = metadata.token_map
if token_map:
    print(f"\nToken ring has {len(token_map.ring)} tokens")
    
    # Show token ownership for a keyspace
    keyspace_name = 'my_app'
    if keyspace_name in metadata.keyspaces:
        print(f"\nReplica distribution for keyspace '{keyspace_name}':")
        sample_tokens = token_map.ring[:5]  # Sample first 5 tokens
        for token in sample_tokens:
            replicas = token_map.get_replicas(keyspace_name, token)
            replica_addresses = [host.address for host in replicas]
            print(f"  Token {token.value}: {replica_addresses}")

Schema Evolution Tracking

def compare_schemas(old_metadata, new_metadata, keyspace_name):
    """Compare two metadata snapshots to detect schema changes."""
    
    old_ks = old_metadata.get_keyspace(keyspace_name)
    new_ks = new_metadata.get_keyspace(keyspace_name)
    
    if not old_ks or not new_ks:
        print("Keyspace not found in one of the metadata snapshots")
        return
    
    # Compare tables
    old_tables = set(old_ks.tables.keys())
    new_tables = set(new_ks.tables.keys())
    
    added_tables = new_tables - old_tables
    removed_tables = old_tables - new_tables
    common_tables = old_tables & new_tables
    
    if added_tables:
        print(f"Added tables: {added_tables}")
    if removed_tables:
        print(f"Removed tables: {removed_tables}")
    
    # Compare columns in common tables
    for table_name in common_tables:
        old_table = old_ks.tables[table_name]
        new_table = new_ks.tables[table_name]
        
        old_columns = set(old_table.columns.keys())
        new_columns = set(new_table.columns.keys())
        
        added_columns = new_columns - old_columns
        removed_columns = old_columns - new_columns
        
        if added_columns:
            print(f"Table {table_name} - Added columns: {added_columns}")
        if removed_columns:
            print(f"Table {table_name} - Removed columns: {removed_columns}")

# Usage
old_metadata = cluster.metadata
# ... time passes, schema changes occur ...
cluster.metadata.rebuild_schema()
new_metadata = cluster.metadata

compare_schemas(old_metadata, new_metadata, 'my_app')

Dynamic Query Generation

from cassandra.metadata import protect_name, protect_value

def generate_insert_query(table_metadata, data):
    """Generate INSERT query from table metadata and data."""
    
    table_name = f"{table_metadata.keyspace_name}.{protect_name(table_metadata.name)}"
    
    # Filter data to only include existing columns
    valid_columns = []
    valid_values = []
    placeholders = []
    
    for col_name, value in data.items():
        if col_name in table_metadata.columns:
            valid_columns.append(protect_name(col_name))
            valid_values.append(value)
            placeholders.append('?')
    
    if not valid_columns:
        raise ValueError("No valid columns found in data")
    
    query = f"""
        INSERT INTO {table_name} ({', '.join(valid_columns)})
        VALUES ({', '.join(placeholders)})
    """
    
    return query.strip(), valid_values

# Usage
table = metadata.get_table('my_app', 'users')
data = {
    'id': uuid.uuid4(),
    'name': 'Alice Smith',
    'email': 'alice@example.com',
    'invalid_column': 'ignored'  # This will be filtered out
}

query, values = generate_insert_query(table, data)
print(f"Generated query: {query}")
print(f"Values: {values}")

session.execute(query, values)

Custom Metadata Introspection

def analyze_keyspace_complexity(keyspace_metadata):
    """Analyze complexity metrics for a keyspace."""
    
    metrics = {
        'total_tables': len(keyspace_metadata.tables),
        'total_columns': 0,
        'total_indexes': 0,
        'tables_with_clustering': 0,
        'tables_with_static_columns': 0,
        'user_types': len(keyspace_metadata.user_types),
        'functions': len(keyspace_metadata.functions),
        'column_types': set()
    }
    
    for table in keyspace_metadata.tables.values():
        metrics['total_columns'] += len(table.columns)
        metrics['total_indexes'] += len(table.indexes)
        
        if table.clustering_key:
            metrics['tables_with_clustering'] += 1
        
        has_static = any(col.is_static for col in table.columns.values())
        if has_static:
            metrics['tables_with_static_columns'] += 1
        
        for column in table.columns.values():
            metrics['column_types'].add(type(column.cql_type).__name__)
    
    return metrics

# Usage
keyspace = metadata.get_keyspace('my_app')
if keyspace:
    complexity = analyze_keyspace_complexity(keyspace)
    print(f"Keyspace complexity analysis:")
    for metric, value in complexity.items():
        if metric == 'column_types':
            print(f"  {metric}: {sorted(value)}")
        else:
            print(f"  {metric}: {value}")

Install with Tessl CLI

npx tessl i tessl/pypi-cassandra-driver

docs

async-io.md

auth-policies.md

cluster-session.md

cql-types.md

cqlengine-orm.md

index.md

metadata.md

query-execution.md

tile.json