CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-cassandra-driver

Python driver for Apache Cassandra with comprehensive CQL support, connection pooling, and ORM capabilities

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

cqlengine-orm.mddocs/

CQLEngine ORM

Object-relational mapping system with Django-inspired model definitions, query operations, and schema management. CQLEngine provides a high-level interface for working with Cassandra data through Python objects.

Capabilities

Connection Setup

Connection management and configuration for CQLEngine operations.

def setup(hosts, default_keyspace, consistency=ConsistencyLevel.ONE, lazy_connect=False, retry_connect=False, **kwargs):
    """
    Setup CQLEngine connection to Cassandra cluster.

    Parameters:
    - hosts (list): List of host addresses
    - default_keyspace (str): Default keyspace name (required)
    - consistency (int): Default consistency level (default: ConsistencyLevel.ONE)
    - lazy_connect (bool): Whether to connect lazily
    - retry_connect (bool): Whether to retry failed connections
    - kwargs: Additional cluster configuration options

    Note: Must be called before using any CQLEngine models.
    """

def execute(query, parameters=None, consistency_level=None, timeout=None):
    """
    Execute a CQL query directly.

    Parameters:
    - query (str): CQL query string
    - parameters (list or dict): Query parameters
    - consistency_level (int): Consistency level for this query
    - timeout (float): Query timeout in seconds

    Returns:
    ResultSet: Query execution results
    """

def get_session():
    """
    Get the current CQLEngine session.

    Returns:
    Session: Active session object
    """

def get_cluster():
    """
    Get the current CQLEngine cluster.

    Returns:
    Cluster: Active cluster object
    """

Model Definition

Base model class and column types for defining Cassandra table models.

class Model:
    """Base class for CQLEngine models."""

    def __init__(self, **kwargs):
        """
        Initialize model instance with field values.

        Parameters:
        - kwargs: Field name/value pairs
        """

    def save(self):
        """
        Save this model instance to Cassandra.

        Returns:
        Model: The saved model instance
        """

    def delete(self):
        """
        Delete this model instance from Cassandra.
        """

    def update(self, **kwargs):
        """
        Update specific fields of this model instance.

        Parameters:
        - kwargs: Field name/value pairs to update

        Returns:
        Model: The updated model instance
        """

    @classmethod
    def create(cls, **kwargs):
        """
        Create and save a new model instance.

        Parameters:
        - kwargs: Field name/value pairs

        Returns:
        Model: The created model instance
        """

    @classmethod
    def objects(cls):
        """
        Get a QuerySet for this model.

        Returns:
        ModelQuerySet: QuerySet for querying model instances
        """

    @classmethod
    def all(cls):
        """
        Get all instances of this model.

        Returns:
        ModelQuerySet: QuerySet containing all instances
        """

    @classmethod
    def get(cls, **kwargs):
        """
        Get a single model instance by primary key.

        Parameters:
        - kwargs: Primary key field values

        Returns:
        Model: The matching model instance

        Raises:
        - DoesNotExist: No matching instance found
        - MultipleObjectsReturned: Multiple instances found
        """

    @classmethod
    def filter(cls, **kwargs):
        """
        Filter model instances by field values.

        Parameters:
        - kwargs: Field filter conditions

        Returns:
        ModelQuerySet: Filtered QuerySet
        """

    def validate(self):
        """
        Validate this model instance.

        Raises:
        - ValidationError: If validation fails
        """

    def __table_name__(self):
        """
        Get the Cassandra table name for this model.

        Returns:
        str: Table name
        """

    class Meta:
        """Metaclass for model configuration."""
        pass

Column Types

Column type definitions for model fields with validation and serialization.

class Column:
    """Base class for all column types."""

    def __init__(self, primary_key=False, partition_key=False, clustering_order=None, 
                 required=True, default=None, db_field=None, index=False, 
                 static=False, discriminator_column=False):
        """
        Base column initialization.

        Parameters:
        - primary_key (bool): Whether this is a primary key column
        - partition_key (bool): Whether this is part of the partition key
        - clustering_order (str): Clustering order ('ASC' or 'DESC')
        - required (bool): Whether the field is required
        - default: Default value or callable
        - db_field (str): Custom database column name
        - index (bool): Whether to create a secondary index
        - static (bool): Whether this is a static column
        - discriminator_column (bool): Whether this is a discriminator column
        """

# Numeric Column Types
class Integer(Column):
    """32-bit signed integer column."""

class BigInt(Column):
    """64-bit signed integer column."""

class SmallInt(Column):
    """16-bit signed integer column."""

class TinyInt(Column):
    """8-bit signed integer column."""

class VarInt(Column):
    """Variable precision integer column."""

class Counter(Column):
    """Counter column (special Cassandra counter type)."""

class Float(Column):
    """32-bit floating point column."""

class Double(Column):
    """64-bit floating point column."""

class Decimal(Column):
    """High-precision decimal column."""

# Text Column Types
class Text(Column):
    """Text/varchar column."""

class Ascii(Column):
    """ASCII-only text column."""

Varchar = Text  # Alias for Text

# Binary and Network Types
class Blob(Column):
    """Binary data column."""

class Inet(Column):
    """IP address column."""

# Temporal Column Types
class DateTime(Column):
    """Timestamp column with datetime values."""

class Date(Column):
    """Date column (date only, no time)."""

class Time(Column):
    """Time column (time of day only)."""

class UUID(Column):
    """UUID column."""

class TimeUUID(Column):
    """Time-based UUID column (version 1)."""

# Boolean Type
class Boolean(Column):
    """Boolean column."""

# Collection Column Types
class List(Column):
    """List collection column."""
    
    def __init__(self, value_type, **kwargs):
        """
        Parameters:
        - value_type (Column): Type of list elements
        - kwargs: Additional column options
        """

class Set(Column):
    """Set collection column."""
    
    def __init__(self, value_type, **kwargs):
        """
        Parameters:
        - value_type (Column): Type of set elements
        - kwargs: Additional column options
        """

class Map(Column):
    """Map collection column."""
    
    def __init__(self, key_type, value_type, **kwargs):
        """
        Parameters:
        - key_type (Column): Type of map keys
        - value_type (Column): Type of map values
        - kwargs: Additional column options
        """

# Complex Types
class UserDefinedType(Column):
    """User-defined type column."""
    
    def __init__(self, user_type, **kwargs):
        """
        Parameters:
        - user_type (UserType): User-defined type class
        - kwargs: Additional column options
        """

Query Operations

QuerySet classes for building and executing queries on models.

class QuerySet:
    """Base QuerySet for model queries."""

    def filter(self, **kwargs):
        """
        Filter results by field values.

        Parameters:
        - kwargs: Field filter conditions

        Returns:
        QuerySet: Filtered QuerySet
        """

    def limit(self, count):
        """
        Limit the number of results.

        Parameters:
        - count (int): Maximum number of results

        Returns:
        QuerySet: Limited QuerySet
        """

    def allow_filtering(self):
        """
        Allow server-side filtering (ALLOW FILTERING).

        Returns:
        QuerySet: QuerySet with filtering enabled
        """

    def consistency(self, consistency_level):
        """
        Set consistency level for this query.

        Parameters:
        - consistency_level (int): Consistency level

        Returns:
        QuerySet: QuerySet with specified consistency
        """

    def timeout(self, timeout):
        """
        Set query timeout.

        Parameters:
        - timeout (float): Timeout in seconds

        Returns:
        QuerySet: QuerySet with specified timeout
        """

    def count(self):
        """
        Get the count of matching records.

        Returns:
        int: Number of matching records
        """

    def get(self):
        """
        Get a single result.

        Returns:
        Model: Single model instance

        Raises:
        - DoesNotExist: No results found
        - MultipleObjectsReturned: Multiple results found
        """

    def first(self):
        """
        Get the first result.

        Returns:
        Model: First model instance or None
        """

    def all(self):
        """
        Get all results.

        Returns:
        list: List of model instances
        """

    def __iter__(self):
        """Iterate over results."""

    def __len__(self):
        """Get the number of results."""

class ModelQuerySet(QuerySet):
    """QuerySet specific to model classes."""

    def update(self, **kwargs):
        """
        Update all matching records.

        Parameters:
        - kwargs: Field values to update

        Returns:
        int: Number of updated records
        """

    def delete(self):
        """
        Delete all matching records.

        Returns:
        int: Number of deleted records
        """

    def values_list(self, *fields, flat=False):
        """
        Return specific field values instead of model instances.

        Parameters:
        - fields: Field names to include
        - flat (bool): Return flat list for single field

        Returns:
        list: List of field value tuples or single values
        """

    def only(self, *fields):
        """
        Only fetch specified fields.

        Parameters:
        - fields: Field names to fetch

        Returns:
        ModelQuerySet: QuerySet with limited fields
        """

class BatchQuery:
    """Batch multiple operations together."""

    def __init__(self, batch_type=None, consistency=None, execute_on_exception=False):
        """
        Initialize batch query.

        Parameters:
        - batch_type (int): Type of batch (LOGGED, UNLOGGED, COUNTER)
        - consistency (int): Consistency level for the batch
        - execute_on_exception (bool): Execute batch even if exception occurs
        """

    def save(self, model_instance):
        """
        Add model save to the batch.

        Parameters:
        - model_instance (Model): Model instance to save
        """

    def create(self, model_class, **kwargs):
        """
        Add model creation to the batch.

        Parameters:
        - model_class (Model): Model class to create
        - kwargs: Field values for new instance
        """

    def update(self, model_instance, **kwargs):
        """
        Add model update to the batch.

        Parameters:
        - model_instance (Model): Model instance to update
        - kwargs: Field values to update
        """

    def delete(self, model_instance):
        """
        Add model deletion to the batch.

        Parameters:
        - model_instance (Model): Model instance to delete
        """

    def execute(self):
        """Execute all operations in the batch."""

    def __enter__(self):
        """Context manager entry."""

    def __exit__(self, exc_type, exc_val, exc_tb):
        """Context manager exit."""

Schema Management

Functions for managing keyspaces, tables, and types.

def sync_table(model, keyspaces=None):
    """
    Create or update table schema for a model.

    Parameters:
    - model (Model): Model class to sync
    - keyspaces (list): Keyspaces to sync in (default: all configured)
    """

def sync_type(keyspace, user_type_model):
    """
    Create or update user-defined type.

    Parameters:
    - keyspace (str): Keyspace name
    - user_type_model (UserType): User type model to sync
    """

def create_keyspace_simple(keyspace_name, replication_factor):
    """
    Create keyspace with SimpleStrategy.

    Parameters:
    - keyspace_name (str): Name of keyspace to create
    - replication_factor (int): Replication factor
    """

def create_keyspace_network_topology(keyspace_name, dc_replication_map):
    """
    Create keyspace with NetworkTopologyStrategy.

    Parameters:
    - keyspace_name (str): Name of keyspace to create
    - dc_replication_map (dict): Replication factors by datacenter
    """

def drop_keyspace(keyspace_name):
    """
    Drop a keyspace.

    Parameters:
    - keyspace_name (str): Name of keyspace to drop
    """

def drop_table(model):
    """
    Drop table for a model.

    Parameters:
    - model (Model): Model class whose table to drop
    """

User-Defined Types

Support for Cassandra user-defined types in models.

class UserType:
    """Base class for user-defined types."""

    def __init__(self, **kwargs):
        """
        Initialize UDT instance.

        Parameters:
        - kwargs: Field name/value pairs
        """

    @classmethod
    def create(cls, **kwargs):
        """
        Create UDT instance.

        Parameters:
        - kwargs: Field values

        Returns:
        UserType: Created UDT instance
        """

    def validate(self):
        """
        Validate UDT instance.

        Raises:
        - ValidationError: If validation fails
        """

Query Functions

CQL functions for use in queries.

def Token(*columns):
    """
    Generate token function for partition key columns.

    Parameters:
    - columns: Column values for token generation

    Returns:
    Function object for use in queries
    """

def MinTimeUUID(timestamp):
    """
    Generate minimum TimeUUID for a timestamp.

    Parameters:
    - timestamp (datetime): Timestamp for UUID generation

    Returns:
    Function object for use in queries
    """

def MaxTimeUUID(timestamp):
    """
    Generate maximum TimeUUID for a timestamp.

    Parameters:
    - timestamp (datetime): Timestamp for UUID generation

    Returns:
    Function object for use in queries
    """

Exception Classes

CQLEngine-specific exception classes.

class CQLEngineException(Exception):
    """Base exception for CQLEngine operations."""

class ValidationError(CQLEngineException):
    """Field validation error."""

class DoesNotExist(CQLEngineException):
    """Model instance does not exist."""

class MultipleObjectsReturned(CQLEngineException):
    """Multiple instances returned when one expected."""

class LWTException(CQLEngineException):
    """Lightweight transaction (conditional update) failed."""

Usage Examples

Model Definition and Setup

from cassandra.cqlengine import connection, management
from cassandra.cqlengine.models import Model
from cassandra.cqlengine.columns import *
import uuid

# Setup connection
connection.setup(['127.0.0.1'], keyspace='blog', consistency=ConsistencyLevel.ONE)

# Create keyspace
management.create_keyspace_simple('blog', 1)

# Define models
class User(Model):
    __keyspace__ = 'blog'
    __table_name__ = 'users'
    
    id = UUID(primary_key=True, default=uuid.uuid4)
    username = Text(required=True, index=True)
    email = Text(required=True)
    created_at = DateTime(default=datetime.utcnow)
    is_active = Boolean(default=True)
    profile = UserDefinedType(user_type='user_profile')
    
    class Meta:
        get_pk_field = 'id'

class Post(Model):
    __keyspace__ = 'blog'
    
    id = UUID(partition_key=True, default=uuid.uuid4)
    created_at = DateTime(primary_key=True, clustering_order='DESC')
    author_id = UUID(required=True, index=True)
    title = Text(required=True)
    content = Text(required=True)
    tags = Set(Text)
    view_count = Counter()
    
    class Meta:
        table_name = 'posts'

# Sync table schemas
management.sync_table(User)
management.sync_table(Post)

Basic CRUD Operations

from datetime import datetime

# Create users
user1 = User.create(
    username='alice',
    email='alice@example.com'
)

user2 = User(
    username='bob',
    email='bob@example.com',
    created_at=datetime.utcnow()
)
user2.save()

# Read users
alice = User.get(username='alice')
print(f"User: {alice.username} ({alice.email})")

# Update user
alice.email = 'alice.smith@example.com'
alice.save()

# Or update specific fields
alice.update(is_active=False)

# Delete user
inactive_users = User.filter(is_active=False)
for user in inactive_users:
    user.delete()

Advanced Querying

# Complex queries
recent_posts = Post.filter(
    created_at__gte=datetime.utcnow() - timedelta(days=7)
).limit(10)

for post in recent_posts:
    print(f"Post: {post.title} by {post.author_id}")

# Query with filtering
tagged_posts = Post.filter(
    tags__contains='python'
).allow_filtering().all()

# Query with specific consistency
important_posts = Post.objects().consistency(ConsistencyLevel.QUORUM).filter(
    author_id=user1.id
)

# Count queries
total_posts = Post.objects().count()
alice_posts = Post.filter(author_id=alice.id).count()

# Field selection
usernames = User.objects().values_list('username', flat=True)
user_info = User.objects().values_list('username', 'email')

# Token-based queries (for pagination)
from cassandra.cqlengine.functions import Token

posts_page = Post.filter(
    pk__token__gt=Token(last_post_id)
).limit(20)

Working with Collections

# Create post with tags
post = Post.create(
    author_id=alice.id,
    title='Python and Cassandra',
    content='How to use CQLEngine...',
    tags={'python', 'cassandra', 'database'}
)

# Update collections
post.tags.add('tutorial')
post.tags.remove('database')
post.save()

# Query by collection values
python_posts = Post.filter(tags__contains='python').allow_filtering()

Batch Operations

from cassandra.cqlengine.query import BatchQuery

# Create multiple records in a batch
with BatchQuery() as batch:
    batch.create(Post, 
                author_id=alice.id,
                title='First Post',
                content='Hello world!')
    batch.create(Post,
                author_id=alice.id, 
                title='Second Post',
                content='More content...')

# Update multiple records in a batch
with BatchQuery() as batch:
    for post in alice_posts:
        batch.update(post, view_count=post.view_count + 1)

# Mixed operations in batch
with BatchQuery() as batch:
    batch.save(new_user)
    batch.update(existing_user, email='new@example.com')
    batch.delete(old_user)

User-Defined Types

from cassandra.cqlengine.usertype import UserType

# Define UDT
class UserProfile(UserType):
    __keyspace__ = 'blog'
    __type_name__ = 'user_profile'
    
    bio = Text()
    website = Text()
    location = Text()
    birth_date = Date()

# Sync UDT
management.sync_type('blog', UserProfile)

# Use UDT in model
class User(Model):
    __keyspace__ = 'blog'
    
    id = UUID(primary_key=True, default=uuid.uuid4)
    username = Text(required=True)
    profile = UserDefinedType(user_type=UserProfile)

# Create user with UDT
profile = UserProfile(
    bio='Software developer',
    website='https://alice.dev',
    location='San Francisco, CA'
)

user = User.create(
    username='alice',
    profile=profile
)

# Access UDT fields
print(f"Bio: {user.profile.bio}")
print(f"Location: {user.profile.location}")

Time-Based Queries

from cassandra.cqlengine.functions import MinTimeUUID, MaxTimeUUID
from datetime import datetime, timedelta

# Query posts from a specific time range
start_time = datetime.utcnow() - timedelta(days=1)
end_time = datetime.utcnow()

# For TimeUUID columns
posts_today = Post.filter(
    created_at__gte=MinTimeUUID(start_time),
    created_at__lte=MaxTimeUUID(end_time)
)

# Time-based pagination
last_week = datetime.utcnow() - timedelta(days=7)
posts_page = Post.filter(
    created_at__gte=MinTimeUUID(last_week)
).limit(50)

for post in posts_page:
    print(f"Posted at: {post.created_at}")

Error Handling

from cassandra.cqlengine import DoesNotExist, MultipleObjectsReturned, ValidationError

# Handle missing records
try:
    user = User.get(username='nonexistent')
except DoesNotExist:
    print("User not found")

# Handle multiple results
try:
    user = User.get(is_active=True)  # Multiple users might be active
except MultipleObjectsReturned:
    print("Multiple active users found")
    users = User.filter(is_active=True)
    user = users.first()

# Handle validation errors
try:
    invalid_user = User(username='', email='invalid-email')
    invalid_user.validate()
except ValidationError as e:
    print(f"Validation failed: {e}")

# Custom validation
class Post(Model):
    title = Text(required=True)
    content = Text(required=True)
    
    def validate(self):
        super().validate()
        if len(self.title) < 3:
            raise ValidationError("Title must be at least 3 characters")
        if len(self.content) < 10:
            raise ValidationError("Content must be at least 10 characters")

Schema Migration

# Add new column to existing model
class User(Model):
    __keyspace__ = 'blog'
    
    id = UUID(primary_key=True)
    username = Text(required=True)  
    email = Text(required=True)
    created_at = DateTime(default=datetime.utcnow)
    is_active = Boolean(default=True)
    last_login = DateTime()  # New column
    login_count = Counter()  # New counter column

# Sync changes
management.sync_table(User)

# The existing data will remain, new columns will be null for existing rows

# Populate new columns for existing users
for user in User.objects().all():
    if user.last_login is None:
        user.update(last_login=user.created_at, login_count=0)

Install with Tessl CLI

npx tessl i tessl/pypi-cassandra-driver

docs

async-io.md

auth-policies.md

cluster-session.md

cql-types.md

cqlengine-orm.md

index.md

metadata.md

query-execution.md

tile.json