Python driver for Apache Cassandra with comprehensive CQL support, connection pooling, and ORM capabilities
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Object-relational mapping system with Django-inspired model definitions, query operations, and schema management. CQLEngine provides a high-level interface for working with Cassandra data through Python objects.
Connection management and configuration for CQLEngine operations.
def setup(hosts, default_keyspace, consistency=ConsistencyLevel.ONE, lazy_connect=False, retry_connect=False, **kwargs):
"""
Setup CQLEngine connection to Cassandra cluster.
Parameters:
- hosts (list): List of host addresses
- default_keyspace (str): Default keyspace name (required)
- consistency (int): Default consistency level (default: ConsistencyLevel.ONE)
- lazy_connect (bool): Whether to connect lazily
- retry_connect (bool): Whether to retry failed connections
- kwargs: Additional cluster configuration options
Note: Must be called before using any CQLEngine models.
"""
def execute(query, parameters=None, consistency_level=None, timeout=None):
"""
Execute a CQL query directly.
Parameters:
- query (str): CQL query string
- parameters (list or dict): Query parameters
- consistency_level (int): Consistency level for this query
- timeout (float): Query timeout in seconds
Returns:
ResultSet: Query execution results
"""
def get_session():
"""
Get the current CQLEngine session.
Returns:
Session: Active session object
"""
def get_cluster():
"""
Get the current CQLEngine cluster.
Returns:
Cluster: Active cluster object
"""Base model class and column types for defining Cassandra table models.
class Model:
"""Base class for CQLEngine models."""
def __init__(self, **kwargs):
"""
Initialize model instance with field values.
Parameters:
- kwargs: Field name/value pairs
"""
def save(self):
"""
Save this model instance to Cassandra.
Returns:
Model: The saved model instance
"""
def delete(self):
"""
Delete this model instance from Cassandra.
"""
def update(self, **kwargs):
"""
Update specific fields of this model instance.
Parameters:
- kwargs: Field name/value pairs to update
Returns:
Model: The updated model instance
"""
@classmethod
def create(cls, **kwargs):
"""
Create and save a new model instance.
Parameters:
- kwargs: Field name/value pairs
Returns:
Model: The created model instance
"""
@classmethod
def objects(cls):
"""
Get a QuerySet for this model.
Returns:
ModelQuerySet: QuerySet for querying model instances
"""
@classmethod
def all(cls):
"""
Get all instances of this model.
Returns:
ModelQuerySet: QuerySet containing all instances
"""
@classmethod
def get(cls, **kwargs):
"""
Get a single model instance by primary key.
Parameters:
- kwargs: Primary key field values
Returns:
Model: The matching model instance
Raises:
- DoesNotExist: No matching instance found
- MultipleObjectsReturned: Multiple instances found
"""
@classmethod
def filter(cls, **kwargs):
"""
Filter model instances by field values.
Parameters:
- kwargs: Field filter conditions
Returns:
ModelQuerySet: Filtered QuerySet
"""
def validate(self):
"""
Validate this model instance.
Raises:
- ValidationError: If validation fails
"""
def __table_name__(self):
"""
Get the Cassandra table name for this model.
Returns:
str: Table name
"""
class Meta:
"""Metaclass for model configuration."""
passColumn type definitions for model fields with validation and serialization.
class Column:
"""Base class for all column types."""
def __init__(self, primary_key=False, partition_key=False, clustering_order=None,
required=True, default=None, db_field=None, index=False,
static=False, discriminator_column=False):
"""
Base column initialization.
Parameters:
- primary_key (bool): Whether this is a primary key column
- partition_key (bool): Whether this is part of the partition key
- clustering_order (str): Clustering order ('ASC' or 'DESC')
- required (bool): Whether the field is required
- default: Default value or callable
- db_field (str): Custom database column name
- index (bool): Whether to create a secondary index
- static (bool): Whether this is a static column
- discriminator_column (bool): Whether this is a discriminator column
"""
# Numeric Column Types
class Integer(Column):
"""32-bit signed integer column."""
class BigInt(Column):
"""64-bit signed integer column."""
class SmallInt(Column):
"""16-bit signed integer column."""
class TinyInt(Column):
"""8-bit signed integer column."""
class VarInt(Column):
"""Variable precision integer column."""
class Counter(Column):
"""Counter column (special Cassandra counter type)."""
class Float(Column):
"""32-bit floating point column."""
class Double(Column):
"""64-bit floating point column."""
class Decimal(Column):
"""High-precision decimal column."""
# Text Column Types
class Text(Column):
"""Text/varchar column."""
class Ascii(Column):
"""ASCII-only text column."""
Varchar = Text # Alias for Text
# Binary and Network Types
class Blob(Column):
"""Binary data column."""
class Inet(Column):
"""IP address column."""
# Temporal Column Types
class DateTime(Column):
"""Timestamp column with datetime values."""
class Date(Column):
"""Date column (date only, no time)."""
class Time(Column):
"""Time column (time of day only)."""
class UUID(Column):
"""UUID column."""
class TimeUUID(Column):
"""Time-based UUID column (version 1)."""
# Boolean Type
class Boolean(Column):
"""Boolean column."""
# Collection Column Types
class List(Column):
"""List collection column."""
def __init__(self, value_type, **kwargs):
"""
Parameters:
- value_type (Column): Type of list elements
- kwargs: Additional column options
"""
class Set(Column):
"""Set collection column."""
def __init__(self, value_type, **kwargs):
"""
Parameters:
- value_type (Column): Type of set elements
- kwargs: Additional column options
"""
class Map(Column):
"""Map collection column."""
def __init__(self, key_type, value_type, **kwargs):
"""
Parameters:
- key_type (Column): Type of map keys
- value_type (Column): Type of map values
- kwargs: Additional column options
"""
# Complex Types
class UserDefinedType(Column):
"""User-defined type column."""
def __init__(self, user_type, **kwargs):
"""
Parameters:
- user_type (UserType): User-defined type class
- kwargs: Additional column options
"""QuerySet classes for building and executing queries on models.
class QuerySet:
"""Base QuerySet for model queries."""
def filter(self, **kwargs):
"""
Filter results by field values.
Parameters:
- kwargs: Field filter conditions
Returns:
QuerySet: Filtered QuerySet
"""
def limit(self, count):
"""
Limit the number of results.
Parameters:
- count (int): Maximum number of results
Returns:
QuerySet: Limited QuerySet
"""
def allow_filtering(self):
"""
Allow server-side filtering (ALLOW FILTERING).
Returns:
QuerySet: QuerySet with filtering enabled
"""
def consistency(self, consistency_level):
"""
Set consistency level for this query.
Parameters:
- consistency_level (int): Consistency level
Returns:
QuerySet: QuerySet with specified consistency
"""
def timeout(self, timeout):
"""
Set query timeout.
Parameters:
- timeout (float): Timeout in seconds
Returns:
QuerySet: QuerySet with specified timeout
"""
def count(self):
"""
Get the count of matching records.
Returns:
int: Number of matching records
"""
def get(self):
"""
Get a single result.
Returns:
Model: Single model instance
Raises:
- DoesNotExist: No results found
- MultipleObjectsReturned: Multiple results found
"""
def first(self):
"""
Get the first result.
Returns:
Model: First model instance or None
"""
def all(self):
"""
Get all results.
Returns:
list: List of model instances
"""
def __iter__(self):
"""Iterate over results."""
def __len__(self):
"""Get the number of results."""
class ModelQuerySet(QuerySet):
"""QuerySet specific to model classes."""
def update(self, **kwargs):
"""
Update all matching records.
Parameters:
- kwargs: Field values to update
Returns:
int: Number of updated records
"""
def delete(self):
"""
Delete all matching records.
Returns:
int: Number of deleted records
"""
def values_list(self, *fields, flat=False):
"""
Return specific field values instead of model instances.
Parameters:
- fields: Field names to include
- flat (bool): Return flat list for single field
Returns:
list: List of field value tuples or single values
"""
def only(self, *fields):
"""
Only fetch specified fields.
Parameters:
- fields: Field names to fetch
Returns:
ModelQuerySet: QuerySet with limited fields
"""
class BatchQuery:
"""Batch multiple operations together."""
def __init__(self, batch_type=None, consistency=None, execute_on_exception=False):
"""
Initialize batch query.
Parameters:
- batch_type (int): Type of batch (LOGGED, UNLOGGED, COUNTER)
- consistency (int): Consistency level for the batch
- execute_on_exception (bool): Execute batch even if exception occurs
"""
def save(self, model_instance):
"""
Add model save to the batch.
Parameters:
- model_instance (Model): Model instance to save
"""
def create(self, model_class, **kwargs):
"""
Add model creation to the batch.
Parameters:
- model_class (Model): Model class to create
- kwargs: Field values for new instance
"""
def update(self, model_instance, **kwargs):
"""
Add model update to the batch.
Parameters:
- model_instance (Model): Model instance to update
- kwargs: Field values to update
"""
def delete(self, model_instance):
"""
Add model deletion to the batch.
Parameters:
- model_instance (Model): Model instance to delete
"""
def execute(self):
"""Execute all operations in the batch."""
def __enter__(self):
"""Context manager entry."""
def __exit__(self, exc_type, exc_val, exc_tb):
"""Context manager exit."""Functions for managing keyspaces, tables, and types.
def sync_table(model, keyspaces=None):
"""
Create or update table schema for a model.
Parameters:
- model (Model): Model class to sync
- keyspaces (list): Keyspaces to sync in (default: all configured)
"""
def sync_type(keyspace, user_type_model):
"""
Create or update user-defined type.
Parameters:
- keyspace (str): Keyspace name
- user_type_model (UserType): User type model to sync
"""
def create_keyspace_simple(keyspace_name, replication_factor):
"""
Create keyspace with SimpleStrategy.
Parameters:
- keyspace_name (str): Name of keyspace to create
- replication_factor (int): Replication factor
"""
def create_keyspace_network_topology(keyspace_name, dc_replication_map):
"""
Create keyspace with NetworkTopologyStrategy.
Parameters:
- keyspace_name (str): Name of keyspace to create
- dc_replication_map (dict): Replication factors by datacenter
"""
def drop_keyspace(keyspace_name):
"""
Drop a keyspace.
Parameters:
- keyspace_name (str): Name of keyspace to drop
"""
def drop_table(model):
"""
Drop table for a model.
Parameters:
- model (Model): Model class whose table to drop
"""Support for Cassandra user-defined types in models.
class UserType:
"""Base class for user-defined types."""
def __init__(self, **kwargs):
"""
Initialize UDT instance.
Parameters:
- kwargs: Field name/value pairs
"""
@classmethod
def create(cls, **kwargs):
"""
Create UDT instance.
Parameters:
- kwargs: Field values
Returns:
UserType: Created UDT instance
"""
def validate(self):
"""
Validate UDT instance.
Raises:
- ValidationError: If validation fails
"""CQL functions for use in queries.
def Token(*columns):
"""
Generate token function for partition key columns.
Parameters:
- columns: Column values for token generation
Returns:
Function object for use in queries
"""
def MinTimeUUID(timestamp):
"""
Generate minimum TimeUUID for a timestamp.
Parameters:
- timestamp (datetime): Timestamp for UUID generation
Returns:
Function object for use in queries
"""
def MaxTimeUUID(timestamp):
"""
Generate maximum TimeUUID for a timestamp.
Parameters:
- timestamp (datetime): Timestamp for UUID generation
Returns:
Function object for use in queries
"""CQLEngine-specific exception classes.
class CQLEngineException(Exception):
"""Base exception for CQLEngine operations."""
class ValidationError(CQLEngineException):
"""Field validation error."""
class DoesNotExist(CQLEngineException):
"""Model instance does not exist."""
class MultipleObjectsReturned(CQLEngineException):
"""Multiple instances returned when one expected."""
class LWTException(CQLEngineException):
"""Lightweight transaction (conditional update) failed."""from cassandra.cqlengine import connection, management
from cassandra.cqlengine.models import Model
from cassandra.cqlengine.columns import *
import uuid
# Setup connection
connection.setup(['127.0.0.1'], keyspace='blog', consistency=ConsistencyLevel.ONE)
# Create keyspace
management.create_keyspace_simple('blog', 1)
# Define models
class User(Model):
__keyspace__ = 'blog'
__table_name__ = 'users'
id = UUID(primary_key=True, default=uuid.uuid4)
username = Text(required=True, index=True)
email = Text(required=True)
created_at = DateTime(default=datetime.utcnow)
is_active = Boolean(default=True)
profile = UserDefinedType(user_type='user_profile')
class Meta:
get_pk_field = 'id'
class Post(Model):
__keyspace__ = 'blog'
id = UUID(partition_key=True, default=uuid.uuid4)
created_at = DateTime(primary_key=True, clustering_order='DESC')
author_id = UUID(required=True, index=True)
title = Text(required=True)
content = Text(required=True)
tags = Set(Text)
view_count = Counter()
class Meta:
table_name = 'posts'
# Sync table schemas
management.sync_table(User)
management.sync_table(Post)from datetime import datetime
# Create users
user1 = User.create(
username='alice',
email='alice@example.com'
)
user2 = User(
username='bob',
email='bob@example.com',
created_at=datetime.utcnow()
)
user2.save()
# Read users
alice = User.get(username='alice')
print(f"User: {alice.username} ({alice.email})")
# Update user
alice.email = 'alice.smith@example.com'
alice.save()
# Or update specific fields
alice.update(is_active=False)
# Delete user
inactive_users = User.filter(is_active=False)
for user in inactive_users:
user.delete()# Complex queries
recent_posts = Post.filter(
created_at__gte=datetime.utcnow() - timedelta(days=7)
).limit(10)
for post in recent_posts:
print(f"Post: {post.title} by {post.author_id}")
# Query with filtering
tagged_posts = Post.filter(
tags__contains='python'
).allow_filtering().all()
# Query with specific consistency
important_posts = Post.objects().consistency(ConsistencyLevel.QUORUM).filter(
author_id=user1.id
)
# Count queries
total_posts = Post.objects().count()
alice_posts = Post.filter(author_id=alice.id).count()
# Field selection
usernames = User.objects().values_list('username', flat=True)
user_info = User.objects().values_list('username', 'email')
# Token-based queries (for pagination)
from cassandra.cqlengine.functions import Token
posts_page = Post.filter(
pk__token__gt=Token(last_post_id)
).limit(20)# Create post with tags
post = Post.create(
author_id=alice.id,
title='Python and Cassandra',
content='How to use CQLEngine...',
tags={'python', 'cassandra', 'database'}
)
# Update collections
post.tags.add('tutorial')
post.tags.remove('database')
post.save()
# Query by collection values
python_posts = Post.filter(tags__contains='python').allow_filtering()from cassandra.cqlengine.query import BatchQuery
# Create multiple records in a batch
with BatchQuery() as batch:
batch.create(Post,
author_id=alice.id,
title='First Post',
content='Hello world!')
batch.create(Post,
author_id=alice.id,
title='Second Post',
content='More content...')
# Update multiple records in a batch
with BatchQuery() as batch:
for post in alice_posts:
batch.update(post, view_count=post.view_count + 1)
# Mixed operations in batch
with BatchQuery() as batch:
batch.save(new_user)
batch.update(existing_user, email='new@example.com')
batch.delete(old_user)from cassandra.cqlengine.usertype import UserType
# Define UDT
class UserProfile(UserType):
__keyspace__ = 'blog'
__type_name__ = 'user_profile'
bio = Text()
website = Text()
location = Text()
birth_date = Date()
# Sync UDT
management.sync_type('blog', UserProfile)
# Use UDT in model
class User(Model):
__keyspace__ = 'blog'
id = UUID(primary_key=True, default=uuid.uuid4)
username = Text(required=True)
profile = UserDefinedType(user_type=UserProfile)
# Create user with UDT
profile = UserProfile(
bio='Software developer',
website='https://alice.dev',
location='San Francisco, CA'
)
user = User.create(
username='alice',
profile=profile
)
# Access UDT fields
print(f"Bio: {user.profile.bio}")
print(f"Location: {user.profile.location}")from cassandra.cqlengine.functions import MinTimeUUID, MaxTimeUUID
from datetime import datetime, timedelta
# Query posts from a specific time range
start_time = datetime.utcnow() - timedelta(days=1)
end_time = datetime.utcnow()
# For TimeUUID columns
posts_today = Post.filter(
created_at__gte=MinTimeUUID(start_time),
created_at__lte=MaxTimeUUID(end_time)
)
# Time-based pagination
last_week = datetime.utcnow() - timedelta(days=7)
posts_page = Post.filter(
created_at__gte=MinTimeUUID(last_week)
).limit(50)
for post in posts_page:
print(f"Posted at: {post.created_at}")from cassandra.cqlengine import DoesNotExist, MultipleObjectsReturned, ValidationError
# Handle missing records
try:
user = User.get(username='nonexistent')
except DoesNotExist:
print("User not found")
# Handle multiple results
try:
user = User.get(is_active=True) # Multiple users might be active
except MultipleObjectsReturned:
print("Multiple active users found")
users = User.filter(is_active=True)
user = users.first()
# Handle validation errors
try:
invalid_user = User(username='', email='invalid-email')
invalid_user.validate()
except ValidationError as e:
print(f"Validation failed: {e}")
# Custom validation
class Post(Model):
title = Text(required=True)
content = Text(required=True)
def validate(self):
super().validate()
if len(self.title) < 3:
raise ValidationError("Title must be at least 3 characters")
if len(self.content) < 10:
raise ValidationError("Content must be at least 10 characters")# Add new column to existing model
class User(Model):
__keyspace__ = 'blog'
id = UUID(primary_key=True)
username = Text(required=True)
email = Text(required=True)
created_at = DateTime(default=datetime.utcnow)
is_active = Boolean(default=True)
last_login = DateTime() # New column
login_count = Counter() # New counter column
# Sync changes
management.sync_table(User)
# The existing data will remain, new columns will be null for existing rows
# Populate new columns for existing users
for user in User.objects().all():
if user.last_login is None:
user.update(last_login=user.created_at, login_count=0)Install with Tessl CLI
npx tessl i tessl/pypi-cassandra-driver