Python driver for Apache Cassandra with comprehensive CQL support, connection pooling, and ORM capabilities
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Statement types, query execution, batch operations, and result handling with comprehensive parameter binding and tracing support. The query execution system provides both simple string queries and prepared statements for optimal performance.
Different statement types for various query execution patterns and performance requirements.
class Statement:
def __init__(self):
"""Base class for all statement types."""
@property
def consistency_level(self):
"""int: Consistency level for this statement"""
@consistency_level.setter
def consistency_level(self, consistency_level):
"""Set the consistency level"""
@property
def serial_consistency_level(self):
"""int: Serial consistency level for conditional statements"""
@serial_consistency_level.setter
def serial_consistency_level(self, serial_consistency_level):
"""Set the serial consistency level"""
@property
def retry_policy(self):
"""RetryPolicy: Retry policy for this statement"""
@retry_policy.setter
def retry_policy(self, retry_policy):
"""Set the retry policy"""
@property
def timeout(self):
"""float: Timeout for this statement in seconds"""
@timeout.setter
def timeout(self, timeout):
"""Set the timeout"""
@property
def trace(self):
"""bool: Whether tracing is enabled for this statement"""
@trace.setter
def trace(self, trace):
"""Enable or disable tracing"""
@property
def fetch_size(self):
"""int: Number of rows to fetch per page"""
@fetch_size.setter
def fetch_size(self, fetch_size):
"""Set the fetch size for paging"""
@property
def paging_state(self):
"""bytes: Paging state for continuing paged queries"""
@paging_state.setter
def paging_state(self, paging_state):
"""Set the paging state"""
class SimpleStatement(Statement):
def __init__(self, query_string, consistency_level=None, serial_consistency_level=None, fetch_size=None):
"""
A simple CQL query string statement.
Parameters:
- query_string (str): CQL query with optional parameter placeholders
- consistency_level (int): Consistency level for this query
- serial_consistency_level (int): Serial consistency for conditional queries
- fetch_size (int): Number of rows to fetch per page
"""
@property
def query_string(self):
"""str: The CQL query string"""
class PreparedStatement(Statement):
def __init__(self, column_metadata, query_id, routing_key_indexes, query_string, keyspace, protocol_version, session):
"""
A prepared statement with server-side query compilation.
Note: PreparedStatement objects are created by Session.prepare(), not directly instantiated.
"""
def bind(self, values):
"""
Bind parameter values to create a BoundStatement.
Parameters:
- values (list, tuple, or dict): Parameter values to bind
Returns:
BoundStatement: Statement with bound parameters ready for execution
"""
@property
def query_string(self):
"""str: The original CQL query string"""
@property
def column_metadata(self):
"""list: Metadata for query parameters and result columns"""
@property
def query_id(self):
"""bytes: Server-assigned query identifier"""
@property
def routing_key_indexes(self):
"""list: Indexes of parameters used for routing"""
class BoundStatement(Statement):
def __init__(self, prepared_statement, values=None):
"""
A prepared statement with bound parameter values.
Parameters:
- prepared_statement (PreparedStatement): The prepared statement to bind
- values (list, tuple, or dict): Parameter values
"""
@property
def prepared_statement(self):
"""PreparedStatement: The underlying prepared statement"""
@property
def values(self):
"""list: Bound parameter values"""
@property
def routing_key(self):
"""bytes: Routing key for token-aware load balancing"""Batch multiple statements together for atomic execution or performance optimization.
class BatchType:
"""Constants defining batch operation types."""
LOGGED = 0
"""Atomic batch with transaction log (default)"""
UNLOGGED = 1
"""Non-atomic batch for performance"""
COUNTER = 2
"""Batch for counter operations only"""
class BatchStatement(Statement):
def __init__(self, batch_type=BatchType.LOGGED, consistency_level=None, serial_consistency_level=None):
"""
A batch of multiple statements executed atomically.
Parameters:
- batch_type (int): Type of batch operation (LOGGED, UNLOGGED, or COUNTER)
- consistency_level (int): Consistency level for the batch
- serial_consistency_level (int): Serial consistency for conditional statements
"""
def add(self, statement, parameters=None):
"""
Add a statement to the batch.
Parameters:
- statement (str, SimpleStatement, PreparedStatement, or BoundStatement): Statement to add
- parameters (list, tuple, or dict): Parameters if statement is a string
"""
def add_all(self, statements_and_parameters):
"""
Add multiple statements to the batch.
Parameters:
- statements_and_parameters (iterable): Sequence of (statement, parameters) tuples
"""
def clear(self):
"""Remove all statements from the batch."""
@property
def size(self):
"""int: Number of statements in the batch"""
@property
def batch_type(self):
"""int: The batch type (LOGGED, UNLOGGED, or COUNTER)"""Functions for customizing how query result rows are represented.
def tuple_factory(colnames, rows):
"""
Row factory that returns rows as tuples.
Parameters:
- colnames (list): Column names from the result set
- rows (list): Raw row data from the database
Returns:
list: List of tuples representing rows
"""
def named_tuple_factory(colnames, rows):
"""
Row factory that returns rows as named tuples (default).
Parameters:
- colnames (list): Column names from the result set
- rows (list): Raw row data from the database
Returns:
list: List of named tuples with column names as attributes
"""
def dict_factory(colnames, rows):
"""
Row factory that returns rows as dictionaries.
Parameters:
- colnames (list): Column names from the result set
- rows (list): Raw row data from the database
Returns:
list: List of dictionaries with column names as keys
"""
def ordered_dict_factory(colnames, rows):
"""
Row factory that returns rows as OrderedDict objects.
Parameters:
- colnames (list): Column names from the result set
- rows (list): Raw row data from the database
Returns:
list: List of OrderedDict objects preserving column order
"""Query execution tracing for performance analysis and debugging.
class QueryTrace:
def __init__(self, trace_id, session):
"""
Trace information for an executed query.
Parameters:
- trace_id (UUID): Unique identifier for this trace
- session (Session): Session used to fetch trace details
"""
@property
def trace_id(self):
"""UUID: Unique identifier for this trace"""
@property
def request_type(self):
"""str: Type of request that was traced"""
@property
def duration(self):
"""int: Total duration of the query in microseconds"""
@property
def coordinator(self):
"""str: Address of the coordinator node"""
@property
def parameters(self):
"""dict: Query parameters"""
@property
def started_at(self):
"""datetime: When the query started executing"""
@property
def events(self):
"""list: List of TraceEvent objects showing execution steps"""
class TraceEvent:
def __init__(self, description, datetime, source, source_elapsed):
"""
Individual event in a query trace.
Parameters:
- description (str): Description of this trace event
- datetime (datetime): When this event occurred
- source (str): Address of the node where this event occurred
- source_elapsed (int): Elapsed microseconds since query start
"""
@property
def description(self):
"""str: Description of this trace event"""
@property
def datetime(self):
"""datetime: When this event occurred"""
@property
def source(self):
"""str: Address of the node where this event occurred"""
@property
def source_elapsed(self):
"""int: Elapsed microseconds since query start"""
class TraceUnavailable(Exception):
"""Exception raised when trace information is not available."""
passUtilities for binding parameters to query strings.
def bind_params(query, params, encoder):
"""
Bind parameters to a query string.
Parameters:
- query (str): CQL query string with parameter placeholders
- params (list, tuple, or dict): Parameter values to bind
- encoder (Encoder): Encoder instance for parameter serialization
Returns:
str: Query string with parameters substituted
Note: This is primarily used internally by the driver.
"""from cassandra.query import SimpleStatement
from cassandra import ConsistencyLevel
# Create a simple statement
statement = SimpleStatement(
"SELECT * FROM users WHERE age > %s",
consistency_level=ConsistencyLevel.ONE
)
# Execute with parameters
result = session.execute(statement, [25])
for row in result:
print(f"User: {row.name}, Age: {row.age}")
# Statement with fetch size for paging
paged_statement = SimpleStatement(
"SELECT * FROM large_table",
fetch_size=1000
)
result = session.execute(paged_statement)
for row in result:
# Results are automatically paged
print(row)# Prepare a statement for repeated execution
insert_user = session.prepare("""
INSERT INTO users (id, name, email, age, created_at)
VALUES (?, ?, ?, ?, ?)
""")
# Bind and execute multiple times
import uuid
from datetime import datetime
for i in range(100):
bound_stmt = insert_user.bind([
uuid.uuid4(),
f'User {i}',
f'user{i}@example.com',
20 + (i % 50),
datetime.now()
])
session.execute(bound_stmt)
# Or execute directly with parameters
session.execute(insert_user, [
uuid.uuid4(),
'Alice Smith',
'alice@example.com',
30,
datetime.now()
])from cassandra.query import BatchStatement, BatchType
# Create a logged batch (atomic)
batch = BatchStatement(batch_type=BatchType.LOGGED)
# Add multiple insert statements
batch.add(SimpleStatement("INSERT INTO users (id, name) VALUES (%s, %s)"), [uuid.uuid4(), 'Alice'])
batch.add(SimpleStatement("INSERT INTO users (id, name) VALUES (%s, %s)"), [uuid.uuid4(), 'Bob'])
batch.add(SimpleStatement("INSERT INTO users (id, name) VALUES (%s, %s)"), [uuid.uuid4(), 'Charlie'])
# Execute the batch atomically
session.execute(batch)
# Counter batch for counter updates
counter_batch = BatchStatement(batch_type=BatchType.COUNTER)
counter_batch.add(SimpleStatement("UPDATE counters SET count = count + 1 WHERE id = %s"), [1])
counter_batch.add(SimpleStatement("UPDATE counters SET count = count + 5 WHERE id = %s"), [2])
session.execute(counter_batch)
# Unlogged batch for performance (not atomic)
unlogged_batch = BatchStatement(batch_type=BatchType.UNLOGGED)
for i in range(10):
unlogged_batch.add(
SimpleStatement("INSERT INTO logs (id, message) VALUES (%s, %s)"),
[uuid.uuid4(), f'Log message {i}']
)
session.execute(unlogged_batch)from cassandra.query import dict_factory, ordered_dict_factory
# Use dictionary row factory
session.row_factory = dict_factory
result = session.execute("SELECT id, name, age FROM users LIMIT 5")
for row in result:
print(f"User {row['name']} (ID: {row['id']}) is {row['age']} years old")
# Use ordered dictionary factory to preserve column order
session.row_factory = ordered_dict_factory
result = session.execute("SELECT * FROM users LIMIT 5")
for row in result:
print("Column order:", list(row.keys()))
print("Values:", list(row.values()))
# Custom row factory
def custom_row_factory(colnames, rows):
"""Convert rows to custom objects"""
class UserRow:
def __init__(self, **kwargs):
for key, value in kwargs.items():
setattr(self, key, value)
def __str__(self):
return f"User({self.name}, {self.age})"
return [UserRow(**dict(zip(colnames, row))) for row in rows]
session.row_factory = custom_row_factory
result = session.execute("SELECT name, age FROM users LIMIT 3")
for user in result:
print(user) # Uses custom __str__ method# Enable tracing for a specific query
statement = SimpleStatement("SELECT * FROM users WHERE id = %s")
statement.trace = True
result = session.execute(statement, [user_id])
# Get the trace information
trace = result.get_query_trace(max_wait=5.0)
print(f"Query duration: {trace.duration} microseconds")
print(f"Coordinator: {trace.coordinator}")
print(f"Request type: {trace.request_type}")
# Print all trace events
for event in trace.events:
print(f"[{event.source_elapsed:>6}μs] {event.source}: {event.description}")
# Enable tracing for all queries in a session
session.default_trace = True
result = session.execute("SELECT COUNT(*) FROM users")
trace = result.get_query_trace()
print(f"Count query took {trace.duration} microseconds")# Automatic paging with fetch_size
statement = SimpleStatement("SELECT * FROM large_table", fetch_size=1000)
result = session.execute(statement)
# Iterate through all rows (automatic paging)
row_count = 0
for row in result:
row_count += 1
if row_count % 1000 == 0:
print(f"Processed {row_count} rows...")
# Manual paging control
statement = SimpleStatement("SELECT * FROM large_table", fetch_size=100)
result = session.execute(statement)
while True:
# Process current page
for row in result.current_rows:
process_row(row)
# Check if more pages available
if result.has_more_pages:
# Fetch next page
result = session.execute(statement, paging_state=result.paging_state)
else:
breakfrom cassandra import ConsistencyLevel
from cassandra.policies import FallthroughRetryPolicy
# Configure statement properties
statement = SimpleStatement("SELECT * FROM users WHERE region = %s")
statement.consistency_level = ConsistencyLevel.LOCAL_QUORUM
statement.serial_consistency_level = ConsistencyLevel.LOCAL_SERIAL
statement.timeout = 30.0
statement.retry_policy = FallthroughRetryPolicy()
statement.fetch_size = 5000
# Execute configured statement
result = session.execute(statement, ['US-East'])
# Same configuration for prepared statements
prepared = session.prepare("UPDATE users SET email = ? WHERE id = ?")
prepared.consistency_level = ConsistencyLevel.QUORUM
prepared.timeout = 10.0
bound = prepared.bind(['new@example.com', user_id])
session.execute(bound)Install with Tessl CLI
npx tessl i tessl/pypi-cassandra-driver