Python driver for Apache Cassandra with comprehensive CQL support, connection pooling, and ORM capabilities
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Core cluster connection management, session handling, and connection pooling functionality. The Cluster and Session classes form the foundation of all cassandra-driver operations.
The Cluster class manages connections to multiple Cassandra nodes, handles topology changes, and provides load balancing across the cluster.
class Cluster:
def __init__(
self,
contact_points=None,
port=9042,
executor_threads=2,
auth_provider=None,
load_balancing_policy=None,
reconnection_policy=None,
default_retry_policy=None,
conviction_policy=None,
metrics_enabled=False,
connection_class=None,
ssl_context=None,
ssl_options=None,
sockopts=None,
cql_version=None,
protocol_version=None,
is_default_protocol_version=None,
compression=True,
max_schema_agreement_wait=10,
control_connection_timeout=2.0,
idle_heartbeat_interval=30,
idle_heartbeat_timeout=60,
schema_event_refresh_window=2,
topology_event_refresh_window=10,
status_event_refresh_window=2,
prepare_on_all_hosts=True,
reprepare_on_up=True,
max_requests_per_connection=None,
**kwargs
):
"""
Initialize a Cluster instance to manage Cassandra connections.
Parameters:
- contact_points (list): List of host addresses to initially connect to
- port (int): Port number for Cassandra connections (default: 9042)
- executor_threads (int): Number of threads for I/O operations
- auth_provider (AuthProvider): Authentication provider for credentials
- load_balancing_policy (LoadBalancingPolicy): Policy for host selection
- reconnection_policy (ReconnectionPolicy): Policy for reconnection delays
- default_retry_policy (RetryPolicy): Default retry policy for failed operations
- conviction_policy (ConvictionPolicy): Policy for marking hosts as failed
- metrics_enabled (bool): Enable connection and request metrics
- connection_class: Connection implementation class
- ssl_context: SSL context for encrypted connections
- ssl_options (dict): SSL configuration options
- sockopts (list): Socket options tuples
- cql_version (str): CQL version to use
- protocol_version (int): Native protocol version (1-4)
- compression (bool or str): Enable compression ('snappy', 'lz4', or True)
- max_schema_agreement_wait (float): Max time to wait for schema agreement
- control_connection_timeout (float): Timeout for control connection operations
- idle_heartbeat_interval (float): Interval between heartbeat messages
- idle_heartbeat_timeout (float): Timeout for heartbeat responses
- prepare_on_all_hosts (bool): Prepare statements on all hosts
- reprepare_on_up (bool): Re-prepare statements when hosts come back up
- max_requests_per_connection (int): Max concurrent requests per connection
"""
def connect(self, keyspace=None):
"""
Create a new Session for this cluster.
Parameters:
- keyspace (str): Default keyspace for the session
Returns:
Session: A new session connected to the cluster
"""
def shutdown(self):
"""
Shut down this cluster and all associated sessions.
Closes all connections and stops background threads.
"""
def add_host(self, address, datacenter=None, rack=None, signal=True):
"""
Add a host to the cluster.
Parameters:
- address (str): Host address to add
- datacenter (str): Datacenter name
- rack (str): Rack name
- signal (bool): Whether to signal policy changes
"""
def remove_host(self, host):
"""
Remove a host from the cluster.
Parameters:
- host (Host): Host object to remove
"""
@property
def metadata(self):
"""Metadata: Cluster metadata including keyspace and table information"""
@property
def metrics(self):
"""Metrics: Connection and request metrics if enabled"""The Session class executes queries and manages prepared statements within a keyspace context.
class Session:
def execute(self, query, parameters=None, timeout=None, trace=False):
"""
Execute a query synchronously.
Parameters:
- query (str or Statement): CQL query string or Statement object
- parameters (list or dict): Query parameters for placeholder binding
- timeout (float): Query timeout in seconds
- trace (bool): Enable query tracing
Returns:
ResultSet: Query results with row data and metadata
Raises:
- Unavailable: Not enough replicas available
- ReadTimeout: Read operation timed out
- WriteTimeout: Write operation timed out
- InvalidRequest: Invalid query or parameters
"""
def execute_async(self, query, parameters=None, trace=False):
"""
Execute a query asynchronously.
Parameters:
- query (str or Statement): CQL query string or Statement object
- parameters (list or dict): Query parameters for placeholder binding
- trace (bool): Enable query tracing
Returns:
ResponseFuture: Future object for asynchronous result handling
"""
def prepare(self, query):
"""
Prepare a query for efficient repeated execution.
Parameters:
- query (str): CQL query string with parameter placeholders
Returns:
PreparedStatement: Prepared statement object for binding and execution
"""
def shutdown(self):
"""
Shut down this session and close all connections.
"""
def set_keyspace(self, keyspace):
"""
Set the default keyspace for this session.
Parameters:
- keyspace (str): Keyspace name to use as default
"""
@property
def keyspace(self):
"""str: Current default keyspace for this session"""
@property
def cluster(self):
"""Cluster: The cluster this session is connected to"""
@property
def hosts(self):
"""set: Set of Host objects in the cluster"""
@property
def user_type_deserializers(self):
"""dict: User-defined type deserializers"""
@property
def encoder(self):
"""Encoder: Parameter encoder for this session"""
@property
def row_factory(self):
"""callable: Factory function for creating row objects from results"""
@row_factory.setter
def row_factory(self, factory):
"""Set the row factory for result processing"""
@property
def default_timeout(self):
"""float: Default timeout for queries executed by this session"""
@default_timeout.setter
def default_timeout(self, timeout):
"""Set the default timeout for queries"""
@property
def default_consistency_level(self):
"""int: Default consistency level for queries"""
@default_consistency_level.setter
def default_consistency_level(self, consistency_level):
"""Set the default consistency level"""
@property
def default_serial_consistency_level(self):
"""int: Default serial consistency level for conditional queries"""
@default_serial_consistency_level.setter
def default_serial_consistency_level(self, serial_consistency_level):
"""Set the default serial consistency level"""Asynchronous response objects for non-blocking query execution.
class ResponseFuture:
def result(self, timeout=None):
"""
Block until the query completes and return the result.
Parameters:
- timeout (float): Maximum time to wait for result
Returns:
ResultSet: Query results
Raises:
- Timeout: Operation timed out
- Various query-specific exceptions
"""
def get_query_trace(self, max_wait=2.0):
"""
Get the query trace if tracing was enabled.
Parameters:
- max_wait (float): Maximum time to wait for trace data
Returns:
QueryTrace: Trace information for the executed query
"""
def add_callback(self, fn, *args, **kwargs):
"""
Add a callback function to be called when query completes successfully.
Parameters:
- fn (callable): Function to call with (result, *args, **kwargs)
- args: Additional positional arguments for callback
- kwargs: Additional keyword arguments for callback
"""
def add_errback(self, fn, *args, **kwargs):
"""
Add an error callback function to be called if query fails.
Parameters:
- fn (callable): Function to call with (exception, *args, **kwargs)
- args: Additional positional arguments for callback
- kwargs: Additional keyword arguments for callback
"""
def add_callbacks(self, callback, errback, callback_args=(), callback_kwargs=None, errback_args=(), errback_kwargs=None):
"""
Add both success and error callbacks.
Parameters:
- callback (callable): Success callback function
- errback (callable): Error callback function
- callback_args (tuple): Arguments for success callback
- callback_kwargs (dict): Keyword arguments for success callback
- errback_args (tuple): Arguments for error callback
- errback_kwargs (dict): Keyword arguments for error callback
"""
@property
def query(self):
"""str or Statement: The query that was executed"""
@property
def session(self):
"""Session: The session used to execute the query"""
@property
def has_more_pages(self):
"""bool: Whether there are more pages of results available"""Result sets and paging support for handling large query results.
class ResultSet:
def __iter__(self):
"""Iterate over result rows"""
def __len__(self):
"""Get the number of rows in current page"""
def __getitem__(self, index):
"""Get a row by index"""
@property
def current_rows(self):
"""list: Rows in the current page"""
@property
def has_more_pages(self):
"""bool: Whether there are more pages available"""
@property
def response_future(self):
"""ResponseFuture: Future object used to fetch this result"""
@property
def column_names(self):
"""list: Names of columns in the result set"""
@property
def column_types(self):
"""list: CQL types of columns in the result set"""
class PagedResult:
def __init__(self, future, session):
"""
Initialize a paged result iterator.
Parameters:
- future (ResponseFuture): Initial response future
- session (Session): Session for fetching additional pages
"""
def __iter__(self):
"""Iterate over all rows across all pages"""Host and connection pool management for optimal performance.
class Host:
def __init__(self, address, datacenter=None, rack=None):
"""
Represent a Cassandra host in the cluster.
Parameters:
- address (str): Host IP address or hostname
- datacenter (str): Datacenter name
- rack (str): Rack name
"""
@property
def address(self):
"""str: Host address"""
@property
def datacenter(self):
"""str: Datacenter name"""
@property
def rack(self):
"""str: Rack name"""
@property
def is_up(self):
"""bool: Whether the host is currently up"""
@property
def release_version(self):
"""str: Cassandra release version on this host"""
class HostConnectionPool:
def __init__(self, host, host_distance, session):
"""
Manage connections to a specific host.
Parameters:
- host (Host): Target host
- host_distance (int): Distance category for this host
- session (Session): Parent session
"""
def borrow_connection(self, timeout):
"""
Borrow a connection from the pool.
Parameters:
- timeout (float): Maximum time to wait for connection
Returns:
Connection: Available connection object
"""
def return_connection(self, connection):
"""
Return a connection to the pool.
Parameters:
- connection (Connection): Connection to return
"""
@property
def host(self):
"""Host: The host this pool connects to"""
@property
def is_shutdown(self):
"""bool: Whether this pool has been shut down"""
@property
def open_count(self):
"""int: Number of open connections in the pool"""Specific exceptions for cluster and session operations.
class NoHostAvailable(Exception):
"""No hosts are available for connection."""
def __init__(self, message, errors):
"""
Parameters:
- message (str): Error message
- errors (dict): Dict mapping Host objects to their connection errors
"""
@property
def errors(self):
"""dict: Connection errors by host"""
class QueryExhausted(Exception):
"""All query retries have been exhausted."""
def __init__(self, message, last_host):
"""
Parameters:
- message (str): Error message
- last_host (Host): Last host that was tried
"""
class UserTypeDoesNotExist(Exception):
"""Referenced user-defined type does not exist."""
def __init__(self, keyspace, user_type):
"""
Parameters:
- keyspace (str): Keyspace name
- user_type (str): User-defined type name
"""from cassandra.cluster import Cluster
from cassandra.auth import PlainTextAuthProvider
from cassandra.policies import DCAwareRoundRobinPolicy
# Setup authentication
auth_provider = PlainTextAuthProvider(username='cassandra', password='cassandra')
# Setup load balancing
load_balancing_policy = DCAwareRoundRobinPolicy(local_dc='datacenter1')
# Create cluster with custom configuration
cluster = Cluster(
contact_points=['127.0.0.1', '127.0.0.2'],
port=9042,
auth_provider=auth_provider,
load_balancing_policy=load_balancing_policy,
protocol_version=4
)
# Connect and execute queries
session = cluster.connect()
session.set_keyspace('my_keyspace')
# Execute synchronous query
result = session.execute("SELECT * FROM users WHERE id = %s", [user_id])
for row in result:
print(f"User: {row.name}")
# Execute asynchronous query
future = session.execute_async("SELECT * FROM users")
result = future.result()
# Clean up
cluster.shutdown()# Prepare a statement for repeated execution
insert_stmt = session.prepare("""
INSERT INTO users (id, name, email, created_at)
VALUES (?, ?, ?, ?)
""")
# Execute with different parameters
import uuid
from datetime import datetime
session.execute(insert_stmt, [
uuid.uuid4(),
'Alice Smith',
'alice@example.com',
datetime.now()
])
session.execute(insert_stmt, [
uuid.uuid4(),
'Bob Jones',
'bob@example.com',
datetime.now()
])def handle_success(result):
print(f"Query succeeded with {len(result)} rows")
for row in result:
print(f"User: {row.name}")
def handle_error(exception):
print(f"Query failed: {exception}")
# Execute with callbacks
future = session.execute_async("SELECT * FROM users")
future.add_callback(handle_success)
future.add_errback(handle_error)
# Or wait for result
try:
result = future.result(timeout=10.0)
print(f"Got {len(result)} rows")
except Exception as e:
print(f"Query failed: {e}")Install with Tessl CLI
npx tessl i tessl/pypi-cassandra-driver