Python driver for Apache Cassandra with comprehensive CQL support, connection pooling, and ORM capabilities
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Authentication providers, load balancing policies, retry strategies, and reconnection policies for robust cluster operations. The policy framework provides comprehensive configurability for production deployments.
Authentication providers for secure cluster connections with various authentication mechanisms.
class AuthProvider:
def new_authenticator(self, host):
"""
Create a new Authenticator instance for the given host.
Parameters:
- host (Host): The host to authenticate against
Returns:
Authenticator: Authenticator instance for this host
"""
class Authenticator:
def initial_response(self):
"""
Get the initial response for SASL authentication.
Returns:
bytes: Initial authentication response
"""
def evaluate_challenge(self, challenge):
"""
Evaluate a challenge from the server.
Parameters:
- challenge (bytes): Challenge bytes from server
Returns:
bytes: Response to the challenge
"""
def on_authentication_success(self, token):
"""
Called when authentication succeeds.
Parameters:
- token (bytes): Success token from server
"""
class PlainTextAuthProvider(AuthProvider):
def __init__(self, username, password):
"""
Authentication provider using username and password.
Parameters:
- username (str): Username for authentication
- password (str): Password for authentication
"""
def new_authenticator(self, host):
"""
Create a PlainTextAuthenticator for the host.
Parameters:
- host (Host): Target host
Returns:
PlainTextAuthenticator: Authenticator instance
"""
class PlainTextAuthenticator(Authenticator):
def __init__(self, username, password):
"""
SASL PLAIN authenticator for username/password authentication.
Parameters:
- username (str): Username for authentication
- password (str): Password for authentication
"""
class SaslAuthProvider(AuthProvider):
def __init__(self, **sasl_kwargs):
"""
Generic SASL authentication provider (requires puresasl package).
Parameters:
- sasl_kwargs: Keyword arguments passed to puresasl.client.SASLClient
"""
class SaslAuthenticator(Authenticator):
def __init__(self, sasl_kwargs):
"""
Generic SASL authenticator using puresasl.
Parameters:
- sasl_kwargs (dict): Arguments for SASLClient initialization
"""Policies for selecting which hosts to use for query execution and managing host distances.
class HostDistance:
"""Constants for categorizing host distances."""
LOCAL = 0
"""Hosts in the local datacenter"""
REMOTE = 1
"""Hosts in remote datacenters"""
IGNORED = -1
"""Hosts that should be ignored"""
class LoadBalancingPolicy:
def distance(self, host):
"""
Return the distance designation for a host.
Parameters:
- host (Host): The host to categorize
Returns:
int: HostDistance constant (LOCAL, REMOTE, or IGNORED)
"""
def populate(self, cluster, hosts):
"""
Initialize the policy with cluster information.
Parameters:
- cluster (Cluster): The cluster instance
- hosts (list): List of all known hosts
"""
def make_query_plan(self, working_keyspace=None, query=None):
"""
Generate a query plan (ordered list of hosts to try).
Parameters:
- working_keyspace (str): Current keyspace
- query (Statement): Query being executed
Returns:
list: Ordered list of Host objects to try
"""
def on_up(self, host):
"""
Called when a host comes online.
Parameters:
- host (Host): Host that came online
"""
def on_down(self, host):
"""
Called when a host goes offline.
Parameters:
- host (Host): Host that went offline
"""
def on_add(self, host):
"""
Called when a new host is added to the cluster.
Parameters:
- host (Host): Host that was added
"""
def on_remove(self, host):
"""
Called when a host is removed from the cluster.
Parameters:
- host (Host): Host that was removed
"""
class RoundRobinPolicy(LoadBalancingPolicy):
def __init__(self):
"""
Simple round-robin load balancing across all hosts.
Treats all hosts as LOCAL distance.
"""
class DCAwareRoundRobinPolicy(LoadBalancingPolicy):
def __init__(self, local_dc=None, used_hosts_per_remote_dc=0):
"""
Datacenter-aware round-robin load balancing.
Parameters:
- local_dc (str): Name of the local datacenter
- used_hosts_per_remote_dc (int): Number of remote hosts to use per remote DC
"""
class TokenAwarePolicy(LoadBalancingPolicy):
def __init__(self, child_policy, shuffle_replicas=True):
"""
Token-aware load balancing wrapper that routes queries to replica nodes.
Parameters:
- child_policy (LoadBalancingPolicy): Underlying load balancing policy
- shuffle_replicas (bool): Whether to shuffle replica order
"""
class WhiteListRoundRobinPolicy(LoadBalancingPolicy):
def __init__(self, hosts):
"""
Round-robin load balancing limited to a whitelist of hosts.
Parameters:
- hosts (list): List of allowed host addresses
"""Policies for handling query failures and determining retry behavior.
class RetryDecision:
"""Decision types for retry policy responses."""
RETHROW = 0
"""Re-raise the exception without retrying"""
RETRY = 1
"""Retry the query"""
IGNORE = 2
"""Ignore the error and return an empty result"""
class RetryPolicy:
def on_read_timeout(self, query, consistency_level, required_responses, received_responses, data_retrieved, retry_num):
"""
Handle read timeout errors.
Parameters:
- query (Statement): The query that timed out
- consistency_level (int): Consistency level used
- required_responses (int): Number of responses required
- received_responses (int): Number of responses received
- data_retrieved (bool): Whether data was retrieved before timeout
- retry_num (int): Number of retries already attempted
Returns:
tuple: (RetryDecision, ConsistencyLevel or None)
"""
def on_write_timeout(self, query, consistency_level, write_type, required_responses, received_responses, retry_num):
"""
Handle write timeout errors.
Parameters:
- query (Statement): The query that timed out
- consistency_level (int): Consistency level used
- write_type (str): Type of write operation
- required_responses (int): Number of responses required
- received_responses (int): Number of responses received
- retry_num (int): Number of retries already attempted
Returns:
tuple: (RetryDecision, ConsistencyLevel or None)
"""
def on_unavailable(self, query, consistency_level, required_replicas, alive_replicas, retry_num):
"""
Handle unavailable errors.
Parameters:
- query (Statement): The query that failed
- consistency_level (int): Consistency level used
- required_replicas (int): Number of replicas required
- alive_replicas (int): Number of replicas alive
- retry_num (int): Number of retries already attempted
Returns:
tuple: (RetryDecision, ConsistencyLevel or None)
"""
def on_request_error(self, query, consistency_level, error, retry_num):
"""
Handle general request errors.
Parameters:
- query (Statement): The query that failed
- consistency_level (int): Consistency level used
- error (Exception): The error that occurred
- retry_num (int): Number of retries already attempted
Returns:
tuple: (RetryDecision, ConsistencyLevel or None)
"""
class FallthroughRetryPolicy(RetryPolicy):
def __init__(self):
"""
Never retry queries; always re-raise exceptions.
This is the default retry policy.
"""
class DowngradingConsistencyRetryPolicy(RetryPolicy):
def __init__(self):
"""
Retry queries with degraded consistency levels on certain failures.
Behavior:
- Read timeouts: Retry once with ONE if data was retrieved
- Write timeouts: Retry once with ONE for SIMPLE writes
- Unavailable: Retry once with lower consistency level
"""Policies for managing reconnection delays when hosts become unavailable.
class ReconnectionPolicy:
def new_schedule(self):
"""
Create a new reconnection schedule.
Returns:
generator: Generator yielding delay times in seconds
"""
class ConstantReconnectionPolicy(ReconnectionPolicy):
def __init__(self, delay, max_attempts=None):
"""
Reconnection policy with constant delay between attempts.
Parameters:
- delay (float): Delay in seconds between reconnection attempts
- max_attempts (int): Maximum number of attempts (None for unlimited)
"""
class ExponentialReconnectionPolicy(ReconnectionPolicy):
def __init__(self, base_delay, max_delay, max_attempts=None):
"""
Reconnection policy with exponential backoff.
Parameters:
- base_delay (float): Base delay in seconds for first attempt
- max_delay (float): Maximum delay in seconds
- max_attempts (int): Maximum number of attempts (None for unlimited)
"""Policies for determining when to mark hosts as failed.
class ConvictionPolicy:
def add_failure(self, host, connection_exc):
"""
Record a connection failure for a host.
Parameters:
- host (Host): The host that failed
- connection_exc (Exception): The connection exception
Returns:
bool: True if the host should be convicted (marked as down)
"""
def reset(self, host):
"""
Reset failure tracking for a host.
Parameters:
- host (Host): The host to reset
"""
class SimpleConvictionPolicy(ConvictionPolicy):
def __init__(self):
"""
Simple conviction policy that marks hosts down on first failure.
"""Constants for different write operation types used in retry policies.
class WriteType:
"""Constants for write operation types."""
SIMPLE = 0
"""Write to a single partition key (atomic and isolated)"""
BATCH = 1
"""Write to multiple partition keys using distributed batch log (atomic)"""
UNLOGGED_BATCH = 2
"""Write to multiple partition keys without batch log (not atomic)"""
COUNTER = 3
"""Counter write operation (should not be replayed)"""
BATCH_LOG = 4
"""Initial write to distributed batch log (internal Cassandra operation)"""
CAS = 5
"""Compare-and-set (conditional) write operation"""Listeners for monitoring host state changes in the cluster.
class HostStateListener:
def on_add(self, host):
"""
Called when a new host is added to the cluster.
Parameters:
- host (Host): The host that was added
"""
def on_up(self, host):
"""
Called when a host comes back online.
Parameters:
- host (Host): The host that came online
"""
def on_down(self, host):
"""
Called when a host goes offline.
Parameters:
- host (Host): The host that went offline
"""
def on_remove(self, host):
"""
Called when a host is removed from the cluster.
Parameters:
- host (Host): The host that was removed
"""from cassandra.cluster import Cluster
from cassandra.auth import PlainTextAuthProvider, SaslAuthProvider
# Basic username/password authentication
auth_provider = PlainTextAuthProvider(
username='cassandra_user',
password='secure_password'
)
cluster = Cluster(
contact_points=['127.0.0.1'],
auth_provider=auth_provider
)
# SASL authentication (requires puresasl package)
sasl_auth = SaslAuthProvider(
mechanism='GSSAPI',
service='cassandra',
qops=['auth']
)
cluster_sasl = Cluster(
contact_points=['127.0.0.1'],
auth_provider=sasl_auth
)from cassandra.policies import (
DCAwareRoundRobinPolicy,
TokenAwarePolicy,
WhiteListRoundRobinPolicy,
HostDistance
)
# Datacenter-aware load balancing
dc_aware_policy = DCAwareRoundRobinPolicy(
local_dc='datacenter1',
used_hosts_per_remote_dc=2 # Use 2 hosts from each remote DC
)
# Token-aware routing with DC-aware fallback
token_aware_policy = TokenAwarePolicy(dc_aware_policy)
# Whitelist policy for specific hosts only
whitelist_policy = WhiteListRoundRobinPolicy([
'192.168.1.10',
'192.168.1.11',
'192.168.1.12'
])
cluster = Cluster(
contact_points=['192.168.1.10'],
load_balancing_policy=token_aware_policy
)
# Custom load balancing policy
class CustomLoadBalancingPolicy(LoadBalancingPolicy):
def distance(self, host):
# Mark hosts in 192.168.1.x as local, others as remote
if host.address.startswith('192.168.1.'):
return HostDistance.LOCAL
else:
return HostDistance.REMOTE
def make_query_plan(self, working_keyspace=None, query=None):
# Custom host selection logic
local_hosts = [h for h in self.hosts if self.distance(h) == HostDistance.LOCAL]
remote_hosts = [h for h in self.hosts if self.distance(h) == HostDistance.REMOTE]
# Return local hosts first, then remote
return local_hosts + remote_hosts[:2] # Max 2 remote hosts
custom_policy = CustomLoadBalancingPolicy()
cluster = Cluster(
contact_points=['192.168.1.10'],
load_balancing_policy=custom_policy
)from cassandra.policies import (
FallthroughRetryPolicy,
DowngradingConsistencyRetryPolicy,
RetryPolicy,
RetryDecision
)
from cassandra import ConsistencyLevel
# Use fallthrough policy (no retries)
fallthrough_policy = FallthroughRetryPolicy()
# Use downgrading consistency policy
downgrading_policy = DowngradingConsistencyRetryPolicy()
cluster = Cluster(
contact_points=['127.0.0.1'],
default_retry_policy=downgrading_policy
)
# Custom retry policy
class AggressiveRetryPolicy(RetryPolicy):
def on_read_timeout(self, query, consistency_level, required_responses,
received_responses, data_retrieved, retry_num):
if retry_num < 3: # Retry up to 3 times
if data_retrieved:
# Retry with ONE if we got some data
return (RetryDecision.RETRY, ConsistencyLevel.ONE)
elif received_responses > 0:
# Retry with lower consistency if we got any response
return (RetryDecision.RETRY, ConsistencyLevel.ONE)
# Give up after 3 retries
return (RetryDecision.RETHROW, None)
def on_write_timeout(self, query, consistency_level, write_type,
required_responses, received_responses, retry_num):
if retry_num < 2 and write_type == 'SIMPLE':
# Retry simple writes once with ONE
return (RetryDecision.RETRY, ConsistencyLevel.ONE)
return (RetryDecision.RETHROW, None)
aggressive_policy = AggressiveRetryPolicy()
# Apply retry policy to cluster or individual statements
cluster.default_retry_policy = aggressive_policy
# Or apply to specific statements
from cassandra.query import SimpleStatement
stmt = SimpleStatement("SELECT * FROM users WHERE id = %s")
stmt.retry_policy = aggressive_policyfrom cassandra.policies import (
ConstantReconnectionPolicy,
ExponentialReconnectionPolicy
)
# Constant delay reconnection
constant_policy = ConstantReconnectionPolicy(
delay=5.0, # Wait 5 seconds between attempts
max_attempts=10 # Try up to 10 times
)
# Exponential backoff reconnection
exponential_policy = ExponentialReconnectionPolicy(
base_delay=1.0, # Start with 1 second
max_delay=60.0, # Cap at 60 seconds
max_attempts=None # Retry indefinitely
)
cluster = Cluster(
contact_points=['127.0.0.1'],
reconnection_policy=exponential_policy
)
# Custom reconnection policy
class CustomReconnectionPolicy(ReconnectionPolicy):
def __init__(self, delays):
self.delays = delays
def new_schedule(self):
# Use custom delay sequence
for delay in self.delays:
yield delay
# After custom sequence, use constant 30 second delays
while True:
yield 30.0
custom_reconnect = CustomReconnectionPolicy([1, 2, 5, 10, 15, 20])
cluster = Cluster(
contact_points=['127.0.0.1'],
reconnection_policy=custom_reconnect
)from cassandra.policies import HostStateListener
class MyHostStateListener(HostStateListener):
def on_add(self, host):
print(f"Host added: {host.address}")
def on_up(self, host):
print(f"Host came online: {host.address}")
# Could trigger application-level notifications
def on_down(self, host):
print(f"Host went offline: {host.address}")
# Could trigger alerting systems
def on_remove(self, host):
print(f"Host removed: {host.address}")
# Register the listener
cluster = Cluster(contact_points=['127.0.0.1'])
cluster.register_listener(MyHostStateListener())
session = cluster.connect()
# The listener will now receive notifications about host state changesfrom cassandra.cluster import Cluster
from cassandra.auth import PlainTextAuthProvider
from cassandra.policies import (
DCAwareRoundRobinPolicy,
TokenAwarePolicy,
DowngradingConsistencyRetryPolicy,
ExponentialReconnectionPolicy,
SimpleConvictionPolicy
)
# Complete production configuration
auth_provider = PlainTextAuthProvider(
username='app_user',
password='secure_password'
)
load_balancing_policy = TokenAwarePolicy(
DCAwareRoundRobinPolicy(
local_dc='DC1',
used_hosts_per_remote_dc=1
)
)
retry_policy = DowngradingConsistencyRetryPolicy()
reconnection_policy = ExponentialReconnectionPolicy(
base_delay=1.0,
max_delay=60.0
)
conviction_policy = SimpleConvictionPolicy()
cluster = Cluster(
contact_points=['10.0.1.1', '10.0.1.2', '10.0.1.3'],
port=9042,
auth_provider=auth_provider,
load_balancing_policy=load_balancing_policy,
default_retry_policy=retry_policy,
reconnection_policy=reconnection_policy,
conviction_policy=conviction_policy,
compression=True,
protocol_version=4
)
session = cluster.connect('my_keyspace')Install with Tessl CLI
npx tessl i tessl/pypi-cassandra-driver