CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-cassandra-driver

Python driver for Apache Cassandra with comprehensive CQL support, connection pooling, and ORM capabilities

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

cluster-session.mddocs/

Core Connectivity

Core cluster connection management, session handling, and connection pooling functionality. The Cluster and Session classes form the foundation of all cassandra-driver operations.

Capabilities

Cluster Management

The Cluster class manages connections to multiple Cassandra nodes, handles topology changes, and provides load balancing across the cluster.

class Cluster:
    def __init__(
        self,
        contact_points=None,
        port=9042,
        executor_threads=2,
        auth_provider=None,
        load_balancing_policy=None,
        reconnection_policy=None,
        default_retry_policy=None,
        conviction_policy=None,
        metrics_enabled=False,
        connection_class=None,
        ssl_context=None,
        ssl_options=None,
        sockopts=None,
        cql_version=None,
        protocol_version=None,
        is_default_protocol_version=None,
        compression=True,
        max_schema_agreement_wait=10,
        control_connection_timeout=2.0,
        idle_heartbeat_interval=30,
        idle_heartbeat_timeout=60,
        schema_event_refresh_window=2,
        topology_event_refresh_window=10,
        status_event_refresh_window=2,
        prepare_on_all_hosts=True,
        reprepare_on_up=True,
        max_requests_per_connection=None,
        **kwargs
    ):
        """
        Initialize a Cluster instance to manage Cassandra connections.

        Parameters:
        - contact_points (list): List of host addresses to initially connect to
        - port (int): Port number for Cassandra connections (default: 9042)
        - executor_threads (int): Number of threads for I/O operations
        - auth_provider (AuthProvider): Authentication provider for credentials
        - load_balancing_policy (LoadBalancingPolicy): Policy for host selection
        - reconnection_policy (ReconnectionPolicy): Policy for reconnection delays
        - default_retry_policy (RetryPolicy): Default retry policy for failed operations
        - conviction_policy (ConvictionPolicy): Policy for marking hosts as failed
        - metrics_enabled (bool): Enable connection and request metrics
        - connection_class: Connection implementation class
        - ssl_context: SSL context for encrypted connections
        - ssl_options (dict): SSL configuration options
        - sockopts (list): Socket options tuples
        - cql_version (str): CQL version to use
        - protocol_version (int): Native protocol version (1-4)
        - compression (bool or str): Enable compression ('snappy', 'lz4', or True)
        - max_schema_agreement_wait (float): Max time to wait for schema agreement
        - control_connection_timeout (float): Timeout for control connection operations
        - idle_heartbeat_interval (float): Interval between heartbeat messages
        - idle_heartbeat_timeout (float): Timeout for heartbeat responses
        - prepare_on_all_hosts (bool): Prepare statements on all hosts
        - reprepare_on_up (bool): Re-prepare statements when hosts come back up
        - max_requests_per_connection (int): Max concurrent requests per connection
        """

    def connect(self, keyspace=None):
        """
        Create a new Session for this cluster.

        Parameters:
        - keyspace (str): Default keyspace for the session

        Returns:
        Session: A new session connected to the cluster
        """

    def shutdown(self):
        """
        Shut down this cluster and all associated sessions.
        Closes all connections and stops background threads.
        """

    def add_host(self, address, datacenter=None, rack=None, signal=True):
        """
        Add a host to the cluster.

        Parameters:
        - address (str): Host address to add
        - datacenter (str): Datacenter name
        - rack (str): Rack name  
        - signal (bool): Whether to signal policy changes
        """

    def remove_host(self, host):
        """
        Remove a host from the cluster.

        Parameters:
        - host (Host): Host object to remove
        """

    @property
    def metadata(self):
        """Metadata: Cluster metadata including keyspace and table information"""

    @property
    def metrics(self):
        """Metrics: Connection and request metrics if enabled"""

Session Operations

The Session class executes queries and manages prepared statements within a keyspace context.

class Session:
    def execute(self, query, parameters=None, timeout=None, trace=False):
        """
        Execute a query synchronously.

        Parameters:
        - query (str or Statement): CQL query string or Statement object
        - parameters (list or dict): Query parameters for placeholder binding
        - timeout (float): Query timeout in seconds
        - trace (bool): Enable query tracing

        Returns:
        ResultSet: Query results with row data and metadata

        Raises:
        - Unavailable: Not enough replicas available
        - ReadTimeout: Read operation timed out
        - WriteTimeout: Write operation timed out
        - InvalidRequest: Invalid query or parameters
        """

    def execute_async(self, query, parameters=None, trace=False):
        """
        Execute a query asynchronously.

        Parameters:
        - query (str or Statement): CQL query string or Statement object
        - parameters (list or dict): Query parameters for placeholder binding
        - trace (bool): Enable query tracing

        Returns:
        ResponseFuture: Future object for asynchronous result handling
        """

    def prepare(self, query):
        """
        Prepare a query for efficient repeated execution.

        Parameters:
        - query (str): CQL query string with parameter placeholders

        Returns:
        PreparedStatement: Prepared statement object for binding and execution
        """

    def shutdown(self):
        """
        Shut down this session and close all connections.
        """

    def set_keyspace(self, keyspace):
        """
        Set the default keyspace for this session.

        Parameters:
        - keyspace (str): Keyspace name to use as default
        """

    @property
    def keyspace(self):
        """str: Current default keyspace for this session"""

    @property
    def cluster(self):
        """Cluster: The cluster this session is connected to"""

    @property
    def hosts(self):
        """set: Set of Host objects in the cluster"""

    @property
    def user_type_deserializers(self):
        """dict: User-defined type deserializers"""

    @property
    def encoder(self):
        """Encoder: Parameter encoder for this session"""

    @property
    def row_factory(self):
        """callable: Factory function for creating row objects from results"""

    @row_factory.setter
    def row_factory(self, factory):
        """Set the row factory for result processing"""

    @property
    def default_timeout(self):
        """float: Default timeout for queries executed by this session"""

    @default_timeout.setter
    def default_timeout(self, timeout):
        """Set the default timeout for queries"""

    @property
    def default_consistency_level(self):
        """int: Default consistency level for queries"""

    @default_consistency_level.setter 
    def default_consistency_level(self, consistency_level):
        """Set the default consistency level"""

    @property
    def default_serial_consistency_level(self):
        """int: Default serial consistency level for conditional queries"""

    @default_serial_consistency_level.setter
    def default_serial_consistency_level(self, serial_consistency_level):
        """Set the default serial consistency level"""

Response Handling

Asynchronous response objects for non-blocking query execution.

class ResponseFuture:
    def result(self, timeout=None):
        """
        Block until the query completes and return the result.

        Parameters:
        - timeout (float): Maximum time to wait for result

        Returns:
        ResultSet: Query results

        Raises:
        - Timeout: Operation timed out
        - Various query-specific exceptions
        """

    def get_query_trace(self, max_wait=2.0):
        """
        Get the query trace if tracing was enabled.

        Parameters:
        - max_wait (float): Maximum time to wait for trace data

        Returns:
        QueryTrace: Trace information for the executed query
        """

    def add_callback(self, fn, *args, **kwargs):
        """
        Add a callback function to be called when query completes successfully.

        Parameters:
        - fn (callable): Function to call with (result, *args, **kwargs)
        - args: Additional positional arguments for callback
        - kwargs: Additional keyword arguments for callback
        """

    def add_errback(self, fn, *args, **kwargs):
        """
        Add an error callback function to be called if query fails.

        Parameters:
        - fn (callable): Function to call with (exception, *args, **kwargs)
        - args: Additional positional arguments for callback
        - kwargs: Additional keyword arguments for callback
        """

    def add_callbacks(self, callback, errback, callback_args=(), callback_kwargs=None, errback_args=(), errback_kwargs=None):
        """
        Add both success and error callbacks.

        Parameters:
        - callback (callable): Success callback function
        - errback (callable): Error callback function
        - callback_args (tuple): Arguments for success callback
        - callback_kwargs (dict): Keyword arguments for success callback
        - errback_args (tuple): Arguments for error callback
        - errback_kwargs (dict): Keyword arguments for error callback
        """

    @property
    def query(self):
        """str or Statement: The query that was executed"""

    @property
    def session(self):
        """Session: The session used to execute the query"""

    @property
    def has_more_pages(self):
        """bool: Whether there are more pages of results available"""

Result Processing

Result sets and paging support for handling large query results.

class ResultSet:
    def __iter__(self):
        """Iterate over result rows"""

    def __len__(self):
        """Get the number of rows in current page"""

    def __getitem__(self, index):
        """Get a row by index"""

    @property
    def current_rows(self):
        """list: Rows in the current page"""

    @property
    def has_more_pages(self):
        """bool: Whether there are more pages available"""

    @property
    def response_future(self):
        """ResponseFuture: Future object used to fetch this result"""

    @property
    def column_names(self):
        """list: Names of columns in the result set"""

    @property
    def column_types(self):
        """list: CQL types of columns in the result set"""

class PagedResult:
    def __init__(self, future, session):
        """
        Initialize a paged result iterator.

        Parameters:
        - future (ResponseFuture): Initial response future
        - session (Session): Session for fetching additional pages
        """

    def __iter__(self):
        """Iterate over all rows across all pages"""

Connection Pool Management

Host and connection pool management for optimal performance.

class Host:
    def __init__(self, address, datacenter=None, rack=None):
        """
        Represent a Cassandra host in the cluster.

        Parameters:
        - address (str): Host IP address or hostname
        - datacenter (str): Datacenter name
        - rack (str): Rack name
        """

    @property
    def address(self):
        """str: Host address"""

    @property
    def datacenter(self):
        """str: Datacenter name"""

    @property
    def rack(self):
        """str: Rack name"""

    @property
    def is_up(self):
        """bool: Whether the host is currently up"""

    @property
    def release_version(self):
        """str: Cassandra release version on this host"""

class HostConnectionPool:
    def __init__(self, host, host_distance, session):
        """
        Manage connections to a specific host.

        Parameters:
        - host (Host): Target host
        - host_distance (int): Distance category for this host
        - session (Session): Parent session
        """

    def borrow_connection(self, timeout):
        """
        Borrow a connection from the pool.

        Parameters:
        - timeout (float): Maximum time to wait for connection

        Returns:
        Connection: Available connection object
        """

    def return_connection(self, connection):
        """
        Return a connection to the pool.

        Parameters:
        - connection (Connection): Connection to return
        """

    @property
    def host(self):
        """Host: The host this pool connects to"""

    @property
    def is_shutdown(self):
        """bool: Whether this pool has been shut down"""

    @property
    def open_count(self):
        """int: Number of open connections in the pool"""

Exception Classes

Specific exceptions for cluster and session operations.

class NoHostAvailable(Exception):
    """No hosts are available for connection."""
    
    def __init__(self, message, errors):
        """
        Parameters:
        - message (str): Error message
        - errors (dict): Dict mapping Host objects to their connection errors
        """
    
    @property
    def errors(self):
        """dict: Connection errors by host"""

class QueryExhausted(Exception):
    """All query retries have been exhausted."""
    
    def __init__(self, message, last_host):
        """
        Parameters:
        - message (str): Error message  
        - last_host (Host): Last host that was tried
        """

class UserTypeDoesNotExist(Exception):
    """Referenced user-defined type does not exist."""
    
    def __init__(self, keyspace, user_type):
        """
        Parameters:
        - keyspace (str): Keyspace name
        - user_type (str): User-defined type name
        """

Usage Examples

Basic Cluster Setup

from cassandra.cluster import Cluster
from cassandra.auth import PlainTextAuthProvider
from cassandra.policies import DCAwareRoundRobinPolicy

# Setup authentication
auth_provider = PlainTextAuthProvider(username='cassandra', password='cassandra')

# Setup load balancing
load_balancing_policy = DCAwareRoundRobinPolicy(local_dc='datacenter1')

# Create cluster with custom configuration
cluster = Cluster(
    contact_points=['127.0.0.1', '127.0.0.2'],
    port=9042,
    auth_provider=auth_provider,
    load_balancing_policy=load_balancing_policy,
    protocol_version=4
)

# Connect and execute queries
session = cluster.connect()
session.set_keyspace('my_keyspace')

# Execute synchronous query
result = session.execute("SELECT * FROM users WHERE id = %s", [user_id])
for row in result:
    print(f"User: {row.name}")

# Execute asynchronous query
future = session.execute_async("SELECT * FROM users")
result = future.result()

# Clean up
cluster.shutdown()

Prepared Statement Usage

# Prepare a statement for repeated execution
insert_stmt = session.prepare("""
    INSERT INTO users (id, name, email, created_at)
    VALUES (?, ?, ?, ?)
""")

# Execute with different parameters
import uuid
from datetime import datetime

session.execute(insert_stmt, [
    uuid.uuid4(),
    'Alice Smith', 
    'alice@example.com',
    datetime.now()
])

session.execute(insert_stmt, [
    uuid.uuid4(),
    'Bob Jones',
    'bob@example.com', 
    datetime.now()
])

Asynchronous Operations

def handle_success(result):
    print(f"Query succeeded with {len(result)} rows")
    for row in result:
        print(f"User: {row.name}")

def handle_error(exception):
    print(f"Query failed: {exception}")

# Execute with callbacks
future = session.execute_async("SELECT * FROM users")
future.add_callback(handle_success)
future.add_errback(handle_error)

# Or wait for result
try:
    result = future.result(timeout=10.0)
    print(f"Got {len(result)} rows")
except Exception as e:
    print(f"Query failed: {e}")

Install with Tessl CLI

npx tessl i tessl/pypi-cassandra-driver

docs

async-io.md

auth-policies.md

cluster-session.md

cql-types.md

cqlengine-orm.md

index.md

metadata.md

query-execution.md

tile.json