CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-cassandra-driver

Python driver for Apache Cassandra with comprehensive CQL support, connection pooling, and ORM capabilities

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

auth-policies.mddocs/

Authentication & Policies

Authentication providers, load balancing policies, retry strategies, and reconnection policies for robust cluster operations. The policy framework provides comprehensive configurability for production deployments.

Capabilities

Authentication

Authentication providers for secure cluster connections with various authentication mechanisms.

class AuthProvider:
    def new_authenticator(self, host):
        """
        Create a new Authenticator instance for the given host.

        Parameters:
        - host (Host): The host to authenticate against

        Returns:
        Authenticator: Authenticator instance for this host
        """

class Authenticator:
    def initial_response(self):
        """
        Get the initial response for SASL authentication.

        Returns:
        bytes: Initial authentication response
        """

    def evaluate_challenge(self, challenge):
        """
        Evaluate a challenge from the server.

        Parameters:
        - challenge (bytes): Challenge bytes from server

        Returns:
        bytes: Response to the challenge
        """

    def on_authentication_success(self, token):
        """
        Called when authentication succeeds.

        Parameters:
        - token (bytes): Success token from server
        """

class PlainTextAuthProvider(AuthProvider):
    def __init__(self, username, password):
        """
        Authentication provider using username and password.

        Parameters:
        - username (str): Username for authentication
        - password (str): Password for authentication
        """

    def new_authenticator(self, host):
        """
        Create a PlainTextAuthenticator for the host.

        Parameters:
        - host (Host): Target host

        Returns:
        PlainTextAuthenticator: Authenticator instance
        """

class PlainTextAuthenticator(Authenticator):
    def __init__(self, username, password):
        """
        SASL PLAIN authenticator for username/password authentication.

        Parameters:
        - username (str): Username for authentication
        - password (str): Password for authentication
        """

class SaslAuthProvider(AuthProvider):
    def __init__(self, **sasl_kwargs):
        """
        Generic SASL authentication provider (requires puresasl package).

        Parameters:
        - sasl_kwargs: Keyword arguments passed to puresasl.client.SASLClient
        """

class SaslAuthenticator(Authenticator):
    def __init__(self, sasl_kwargs):
        """
        Generic SASL authenticator using puresasl.

        Parameters:
        - sasl_kwargs (dict): Arguments for SASLClient initialization
        """

Load Balancing Policies

Policies for selecting which hosts to use for query execution and managing host distances.

class HostDistance:
    """Constants for categorizing host distances."""
    
    LOCAL = 0
    """Hosts in the local datacenter"""
    
    REMOTE = 1
    """Hosts in remote datacenters"""
    
    IGNORED = -1
    """Hosts that should be ignored"""

class LoadBalancingPolicy:
    def distance(self, host):
        """
        Return the distance designation for a host.

        Parameters:
        - host (Host): The host to categorize

        Returns:
        int: HostDistance constant (LOCAL, REMOTE, or IGNORED)
        """

    def populate(self, cluster, hosts):
        """
        Initialize the policy with cluster information.

        Parameters:
        - cluster (Cluster): The cluster instance
        - hosts (list): List of all known hosts
        """

    def make_query_plan(self, working_keyspace=None, query=None):
        """
        Generate a query plan (ordered list of hosts to try).

        Parameters:
        - working_keyspace (str): Current keyspace
        - query (Statement): Query being executed

        Returns:
        list: Ordered list of Host objects to try
        """

    def on_up(self, host):
        """
        Called when a host comes online.

        Parameters:
        - host (Host): Host that came online
        """

    def on_down(self, host):
        """
        Called when a host goes offline.

        Parameters:
        - host (Host): Host that went offline
        """

    def on_add(self, host):
        """
        Called when a new host is added to the cluster.

        Parameters:
        - host (Host): Host that was added
        """

    def on_remove(self, host):
        """
        Called when a host is removed from the cluster.

        Parameters:
        - host (Host): Host that was removed
        """

class RoundRobinPolicy(LoadBalancingPolicy):
    def __init__(self):
        """
        Simple round-robin load balancing across all hosts.
        Treats all hosts as LOCAL distance.
        """

class DCAwareRoundRobinPolicy(LoadBalancingPolicy):
    def __init__(self, local_dc=None, used_hosts_per_remote_dc=0):
        """
        Datacenter-aware round-robin load balancing.

        Parameters:
        - local_dc (str): Name of the local datacenter
        - used_hosts_per_remote_dc (int): Number of remote hosts to use per remote DC
        """

class TokenAwarePolicy(LoadBalancingPolicy):
    def __init__(self, child_policy, shuffle_replicas=True):
        """
        Token-aware load balancing wrapper that routes queries to replica nodes.

        Parameters:
        - child_policy (LoadBalancingPolicy): Underlying load balancing policy
        - shuffle_replicas (bool): Whether to shuffle replica order
        """

class WhiteListRoundRobinPolicy(LoadBalancingPolicy):
    def __init__(self, hosts):
        """
        Round-robin load balancing limited to a whitelist of hosts.

        Parameters:
        - hosts (list): List of allowed host addresses
        """

Retry Policies

Policies for handling query failures and determining retry behavior.

class RetryDecision:
    """Decision types for retry policy responses."""
    
    RETHROW = 0
    """Re-raise the exception without retrying"""
    
    RETRY = 1
    """Retry the query"""
    
    IGNORE = 2
    """Ignore the error and return an empty result"""

class RetryPolicy:
    def on_read_timeout(self, query, consistency_level, required_responses, received_responses, data_retrieved, retry_num):
        """
        Handle read timeout errors.

        Parameters:
        - query (Statement): The query that timed out
        - consistency_level (int): Consistency level used
        - required_responses (int): Number of responses required
        - received_responses (int): Number of responses received
        - data_retrieved (bool): Whether data was retrieved before timeout
        - retry_num (int): Number of retries already attempted

        Returns:
        tuple: (RetryDecision, ConsistencyLevel or None)
        """

    def on_write_timeout(self, query, consistency_level, write_type, required_responses, received_responses, retry_num):
        """
        Handle write timeout errors.

        Parameters:
        - query (Statement): The query that timed out
        - consistency_level (int): Consistency level used
        - write_type (str): Type of write operation
        - required_responses (int): Number of responses required
        - received_responses (int): Number of responses received
        - retry_num (int): Number of retries already attempted

        Returns:
        tuple: (RetryDecision, ConsistencyLevel or None)
        """

    def on_unavailable(self, query, consistency_level, required_replicas, alive_replicas, retry_num):
        """
        Handle unavailable errors.

        Parameters:
        - query (Statement): The query that failed
        - consistency_level (int): Consistency level used
        - required_replicas (int): Number of replicas required
        - alive_replicas (int): Number of replicas alive
        - retry_num (int): Number of retries already attempted

        Returns:
        tuple: (RetryDecision, ConsistencyLevel or None)
        """

    def on_request_error(self, query, consistency_level, error, retry_num):
        """
        Handle general request errors.

        Parameters:
        - query (Statement): The query that failed
        - consistency_level (int): Consistency level used
        - error (Exception): The error that occurred
        - retry_num (int): Number of retries already attempted

        Returns:
        tuple: (RetryDecision, ConsistencyLevel or None)
        """

class FallthroughRetryPolicy(RetryPolicy):
    def __init__(self):
        """
        Never retry queries; always re-raise exceptions.
        This is the default retry policy.
        """

class DowngradingConsistencyRetryPolicy(RetryPolicy):
    def __init__(self):
        """
        Retry queries with degraded consistency levels on certain failures.
        
        Behavior:
        - Read timeouts: Retry once with ONE if data was retrieved
        - Write timeouts: Retry once with ONE for SIMPLE writes
        - Unavailable: Retry once with lower consistency level
        """

Reconnection Policies

Policies for managing reconnection delays when hosts become unavailable.

class ReconnectionPolicy:
    def new_schedule(self):
        """
        Create a new reconnection schedule.

        Returns:
        generator: Generator yielding delay times in seconds
        """

class ConstantReconnectionPolicy(ReconnectionPolicy):
    def __init__(self, delay, max_attempts=None):
        """
        Reconnection policy with constant delay between attempts.

        Parameters:
        - delay (float): Delay in seconds between reconnection attempts
        - max_attempts (int): Maximum number of attempts (None for unlimited)
        """

class ExponentialReconnectionPolicy(ReconnectionPolicy):
    def __init__(self, base_delay, max_delay, max_attempts=None):
        """
        Reconnection policy with exponential backoff.

        Parameters:
        - base_delay (float): Base delay in seconds for first attempt
        - max_delay (float): Maximum delay in seconds
        - max_attempts (int): Maximum number of attempts (None for unlimited)
        """

Conviction Policies

Policies for determining when to mark hosts as failed.

class ConvictionPolicy:
    def add_failure(self, host, connection_exc):
        """
        Record a connection failure for a host.

        Parameters:
        - host (Host): The host that failed
        - connection_exc (Exception): The connection exception

        Returns:
        bool: True if the host should be convicted (marked as down)
        """

    def reset(self, host):
        """
        Reset failure tracking for a host.

        Parameters:
        - host (Host): The host to reset
        """

class SimpleConvictionPolicy(ConvictionPolicy):
    def __init__(self):
        """
        Simple conviction policy that marks hosts down on first failure.
        """

Write Types

Constants for different write operation types used in retry policies.

class WriteType:
    """Constants for write operation types."""
    
    SIMPLE = 0
    """Write to a single partition key (atomic and isolated)"""
    
    BATCH = 1
    """Write to multiple partition keys using distributed batch log (atomic)"""
    
    UNLOGGED_BATCH = 2
    """Write to multiple partition keys without batch log (not atomic)"""
    
    COUNTER = 3
    """Counter write operation (should not be replayed)"""
    
    BATCH_LOG = 4
    """Initial write to distributed batch log (internal Cassandra operation)"""
    
    CAS = 5
    """Compare-and-set (conditional) write operation"""

Host State Listeners

Listeners for monitoring host state changes in the cluster.

class HostStateListener:
    def on_add(self, host):
        """
        Called when a new host is added to the cluster.

        Parameters:
        - host (Host): The host that was added
        """

    def on_up(self, host):
        """
        Called when a host comes back online.

        Parameters:
        - host (Host): The host that came online
        """

    def on_down(self, host):
        """
        Called when a host goes offline.

        Parameters:
        - host (Host): The host that went offline
        """

    def on_remove(self, host):
        """
        Called when a host is removed from the cluster.

        Parameters:
        - host (Host): The host that was removed
        """

Usage Examples

Authentication Setup

from cassandra.cluster import Cluster
from cassandra.auth import PlainTextAuthProvider, SaslAuthProvider

# Basic username/password authentication
auth_provider = PlainTextAuthProvider(
    username='cassandra_user',
    password='secure_password'
)

cluster = Cluster(
    contact_points=['127.0.0.1'],
    auth_provider=auth_provider
)

# SASL authentication (requires puresasl package)
sasl_auth = SaslAuthProvider(
    mechanism='GSSAPI',
    service='cassandra',
    qops=['auth']
)

cluster_sasl = Cluster(
    contact_points=['127.0.0.1'],
    auth_provider=sasl_auth
)

Load Balancing Configuration

from cassandra.policies import (
    DCAwareRoundRobinPolicy, 
    TokenAwarePolicy,
    WhiteListRoundRobinPolicy,
    HostDistance
)

# Datacenter-aware load balancing
dc_aware_policy = DCAwareRoundRobinPolicy(
    local_dc='datacenter1',
    used_hosts_per_remote_dc=2  # Use 2 hosts from each remote DC
)

# Token-aware routing with DC-aware fallback
token_aware_policy = TokenAwarePolicy(dc_aware_policy)

# Whitelist policy for specific hosts only
whitelist_policy = WhiteListRoundRobinPolicy([
    '192.168.1.10',
    '192.168.1.11', 
    '192.168.1.12'
])

cluster = Cluster(
    contact_points=['192.168.1.10'],
    load_balancing_policy=token_aware_policy
)

# Custom load balancing policy
class CustomLoadBalancingPolicy(LoadBalancingPolicy):
    def distance(self, host):
        # Mark hosts in 192.168.1.x as local, others as remote
        if host.address.startswith('192.168.1.'):
            return HostDistance.LOCAL
        else:
            return HostDistance.REMOTE
    
    def make_query_plan(self, working_keyspace=None, query=None):
        # Custom host selection logic
        local_hosts = [h for h in self.hosts if self.distance(h) == HostDistance.LOCAL]
        remote_hosts = [h for h in self.hosts if self.distance(h) == HostDistance.REMOTE]
        
        # Return local hosts first, then remote
        return local_hosts + remote_hosts[:2]  # Max 2 remote hosts

custom_policy = CustomLoadBalancingPolicy()
cluster = Cluster(
    contact_points=['192.168.1.10'],
    load_balancing_policy=custom_policy
)

Retry Policy Configuration

from cassandra.policies import (
    FallthroughRetryPolicy,
    DowngradingConsistencyRetryPolicy,
    RetryPolicy,
    RetryDecision
)
from cassandra import ConsistencyLevel

# Use fallthrough policy (no retries)
fallthrough_policy = FallthroughRetryPolicy()

# Use downgrading consistency policy
downgrading_policy = DowngradingConsistencyRetryPolicy()

cluster = Cluster(
    contact_points=['127.0.0.1'],
    default_retry_policy=downgrading_policy
)

# Custom retry policy
class AggressiveRetryPolicy(RetryPolicy):
    def on_read_timeout(self, query, consistency_level, required_responses, 
                       received_responses, data_retrieved, retry_num):
        if retry_num < 3:  # Retry up to 3 times
            if data_retrieved:
                # Retry with ONE if we got some data
                return (RetryDecision.RETRY, ConsistencyLevel.ONE)
            elif received_responses > 0:
                # Retry with lower consistency if we got any response
                return (RetryDecision.RETRY, ConsistencyLevel.ONE)
        
        # Give up after 3 retries
        return (RetryDecision.RETHROW, None)
    
    def on_write_timeout(self, query, consistency_level, write_type, 
                        required_responses, received_responses, retry_num):
        if retry_num < 2 and write_type == 'SIMPLE':
            # Retry simple writes once with ONE
            return (RetryDecision.RETRY, ConsistencyLevel.ONE)
        
        return (RetryDecision.RETHROW, None)

aggressive_policy = AggressiveRetryPolicy()

# Apply retry policy to cluster or individual statements
cluster.default_retry_policy = aggressive_policy

# Or apply to specific statements
from cassandra.query import SimpleStatement
stmt = SimpleStatement("SELECT * FROM users WHERE id = %s")
stmt.retry_policy = aggressive_policy

Reconnection Policy Configuration

from cassandra.policies import (
    ConstantReconnectionPolicy,
    ExponentialReconnectionPolicy
)

# Constant delay reconnection
constant_policy = ConstantReconnectionPolicy(
    delay=5.0,  # Wait 5 seconds between attempts
    max_attempts=10  # Try up to 10 times
)

# Exponential backoff reconnection
exponential_policy = ExponentialReconnectionPolicy(
    base_delay=1.0,    # Start with 1 second
    max_delay=60.0,    # Cap at 60 seconds
    max_attempts=None  # Retry indefinitely
)

cluster = Cluster(
    contact_points=['127.0.0.1'],
    reconnection_policy=exponential_policy
)

# Custom reconnection policy
class CustomReconnectionPolicy(ReconnectionPolicy):
    def __init__(self, delays):
        self.delays = delays
    
    def new_schedule(self):
        # Use custom delay sequence
        for delay in self.delays:
            yield delay
        
        # After custom sequence, use constant 30 second delays
        while True:
            yield 30.0

custom_reconnect = CustomReconnectionPolicy([1, 2, 5, 10, 15, 20])
cluster = Cluster(
    contact_points=['127.0.0.1'],
    reconnection_policy=custom_reconnect
)

Host State Monitoring

from cassandra.policies import HostStateListener

class MyHostStateListener(HostStateListener):
    def on_add(self, host):
        print(f"Host added: {host.address}")
    
    def on_up(self, host):
        print(f"Host came online: {host.address}")
        # Could trigger application-level notifications
        
    def on_down(self, host):
        print(f"Host went offline: {host.address}")
        # Could trigger alerting systems
        
    def on_remove(self, host):
        print(f"Host removed: {host.address}")

# Register the listener
cluster = Cluster(contact_points=['127.0.0.1'])
cluster.register_listener(MyHostStateListener())
session = cluster.connect()

# The listener will now receive notifications about host state changes

Complete Policy Configuration

from cassandra.cluster import Cluster
from cassandra.auth import PlainTextAuthProvider
from cassandra.policies import (
    DCAwareRoundRobinPolicy,
    TokenAwarePolicy, 
    DowngradingConsistencyRetryPolicy,
    ExponentialReconnectionPolicy,
    SimpleConvictionPolicy
)

# Complete production configuration
auth_provider = PlainTextAuthProvider(
    username='app_user',
    password='secure_password'
)

load_balancing_policy = TokenAwarePolicy(
    DCAwareRoundRobinPolicy(
        local_dc='DC1',
        used_hosts_per_remote_dc=1
    )
)

retry_policy = DowngradingConsistencyRetryPolicy()

reconnection_policy = ExponentialReconnectionPolicy(
    base_delay=1.0,
    max_delay=60.0
)

conviction_policy = SimpleConvictionPolicy()

cluster = Cluster(
    contact_points=['10.0.1.1', '10.0.1.2', '10.0.1.3'],
    port=9042,
    auth_provider=auth_provider,
    load_balancing_policy=load_balancing_policy,
    default_retry_policy=retry_policy,
    reconnection_policy=reconnection_policy,
    conviction_policy=conviction_policy,
    compression=True,
    protocol_version=4
)

session = cluster.connect('my_keyspace')

Install with Tessl CLI

npx tessl i tessl/pypi-cassandra-driver

docs

async-io.md

auth-policies.md

cluster-session.md

cql-types.md

cqlengine-orm.md

index.md

metadata.md

query-execution.md

tile.json