CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-cassandra-driver

Python driver for Apache Cassandra with comprehensive CQL support, connection pooling, and ORM capabilities

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

async-io.mddocs/

Asynchronous I/O

I/O reactor implementations, concurrent execution utilities, and asynchronous operation patterns for high-performance applications. The cassandra-driver supports multiple async frameworks and provides utilities for concurrent query execution.

Capabilities

I/O Reactor Implementations

Connection implementations for different asynchronous frameworks.

class AsyncoreConnection:
    """
    Connection implementation using Python's asyncore module.
    
    This is the default connection class providing basic asynchronous I/O
    without external dependencies.
    """

class GeventConnection:
    """
    Connection implementation using Gevent for async I/O.
    
    Requires: gevent package
    Usage: Set as connection_class in Cluster configuration
    """

class EventletConnection:
    """
    Connection implementation using Eventlet for async I/O.
    
    Requires: eventlet package  
    Usage: Set as connection_class in Cluster configuration
    """

class TwistedConnection:
    """
    Connection implementation using Twisted framework.
    
    Requires: Twisted package
    Usage: Set as connection_class in Cluster configuration
    """

class LibevConnection:
    """
    Connection implementation using libev for high-performance I/O.
    
    Requires: libev system library and C extension compilation
    Usage: Set as connection_class in Cluster configuration
    """

Concurrent Execution

Utilities for executing multiple queries concurrently for improved throughput.

def execute_concurrent(session, statements_and_parameters, concurrency=100, results_generator=False):
    """
    Execute multiple statements concurrently.

    Parameters:
    - session (Session): Session to execute queries on
    - statements_and_parameters (iterable): Sequence of (statement, parameters) tuples
    - concurrency (int): Maximum number of concurrent requests
    - results_generator (bool): Return generator instead of list

    Returns:
    list or generator: Results from query execution, with None for failed queries

    Example:
    statements = [
        (SimpleStatement("INSERT INTO users (id, name) VALUES (?, ?)"), [uuid.uuid4(), 'Alice']),
        (SimpleStatement("INSERT INTO users (id, name) VALUES (?, ?)"), [uuid.uuid4(), 'Bob']),
    ]
    results = execute_concurrent(session, statements)
    """

def execute_concurrent_with_args(session, statement, parameters, concurrency=100, results_generator=False):
    """
    Execute a single statement with multiple parameter sets concurrently.

    Parameters:
    - session (Session): Session to execute queries on
    - statement (str or Statement): Statement to execute repeatedly
    - parameters (iterable): Sequence of parameter lists/dicts
    - concurrency (int): Maximum number of concurrent requests
    - results_generator (bool): Return generator instead of list

    Returns:
    list or generator: Results from query execution, with None for failed queries

    Example:
    statement = "INSERT INTO users (id, name) VALUES (?, ?)"
    parameters = [
        [uuid.uuid4(), 'Alice'],
        [uuid.uuid4(), 'Bob'],
        [uuid.uuid4(), 'Charlie']
    ]
    results = execute_concurrent_with_args(session, statement, parameters)
    """

Asynchronous Response Handling

Enhanced response future handling for asynchronous operations.

class ResponseFuture:
    """
    Future object representing an asynchronous query execution.
    """

    def result(self, timeout=None):
        """
        Block and wait for the query result.

        Parameters:
        - timeout (float): Maximum time to wait in seconds

        Returns:
        ResultSet: Query results

        Raises:
        - Timeout: If timeout is exceeded
        - Various query-specific exceptions
        """

    def get_query_trace(self, max_wait=2.0):
        """
        Get query trace information if tracing was enabled.

        Parameters:
        - max_wait (float): Maximum time to wait for trace

        Returns:
        QueryTrace: Trace information or None
        """

    def add_callback(self, fn, *args, **kwargs):
        """
        Add success callback to be executed when query completes successfully.

        Parameters:
        - fn (callable): Callback function
        - args: Additional arguments for callback
        - kwargs: Additional keyword arguments for callback

        The callback will be called with: fn(result, *args, **kwargs)
        """

    def add_errback(self, fn, *args, **kwargs):
        """
        Add error callback to be executed when query fails.

        Parameters:
        - fn (callable): Error callback function
        - args: Additional arguments for callback
        - kwargs: Additional keyword arguments for callback

        The errback will be called with: fn(exception, *args, **kwargs)
        """

    def add_callbacks(self, callback, errback, callback_args=(), callback_kwargs=None, errback_args=(), errback_kwargs=None):
        """
        Add both success and error callbacks.

        Parameters:
        - callback (callable): Success callback
        - errback (callable): Error callback
        - callback_args (tuple): Arguments for success callback
        - callback_kwargs (dict): Keyword arguments for success callback
        - errback_args (tuple): Arguments for error callback
        - errback_kwargs (dict): Keyword arguments for error callback
        """

    @property
    def query(self):
        """str or Statement: The query that was executed"""

    @property
    def session(self):
        """Session: The session used for execution"""

    @property
    def coordinator_host(self):
        """Host: The coordinator host for this query"""

    @property
    def has_more_pages(self):
        """bool: Whether there are more pages of results"""

    @property
    def warnings(self):
        """list: List of warning messages from the server"""

    @property
    def custom_payload(self):
        """dict: Custom payload returned by the server"""

    @property
    def is_schema_agreed(self):
        """bool: Whether schema agreement was reached"""

Connection Pool Events

Event-driven connection pool management for monitoring and debugging.

class HostConnectionPool:
    """
    Connection pool for a specific host with async capabilities.
    """

    def get_connections(self):
        """
        Get all connections in the pool.

        Returns:
        set: Set of active connections
        """

    def return_connection(self, connection):
        """
        Return a connection to the pool.

        Parameters:
        - connection: Connection object to return
        """

    def shutdown(self):
        """
        Shutdown the connection pool and close all connections.
        """

    @property
    def host(self):
        """Host: The host this pool connects to"""

    @property
    def is_shutdown(self):
        """bool: Whether the pool has been shut down"""

    @property
    def open_count(self):
        """int: Number of open connections"""

Usage Examples

Basic Asynchronous Operations

from cassandra.cluster import Cluster
from cassandra.query import SimpleStatement
import uuid

cluster = Cluster()
session = cluster.connect('keyspace1')

# Execute query asynchronously
future = session.execute_async("SELECT * FROM users")

# Option 1: Block and wait for result
result = future.result(timeout=10.0)
for row in result:
    print(f"User: {row.name}")

# Option 2: Use callbacks
def handle_success(result):
    print(f"Query returned {len(result)} rows")
    for row in result:
        print(f"User: {row.name}")

def handle_error(exception):
    print(f"Query failed: {exception}")

future = session.execute_async("SELECT * FROM users")
future.add_callback(handle_success)
future.add_errback(handle_error)

# Continue with other work while query executes in background
print("Query executing in background...")

Using Different I/O Reactors

from cassandra.cluster import Cluster
from cassandra.io.geventreactor import GeventConnection
from cassandra.io.eventletreactor import EventletConnection
from cassandra.io.twistedreactor import TwistedConnection
from cassandra.io.libevreactor import LibevConnection

# Gevent reactor (requires: pip install gevent)
cluster_gevent = Cluster(
    contact_points=['127.0.0.1'],
    connection_class=GeventConnection
)

# Eventlet reactor (requires: pip install eventlet)
cluster_eventlet = Cluster(
    contact_points=['127.0.0.1'],
    connection_class=EventletConnection
)

# Twisted reactor (requires: pip install twisted)
cluster_twisted = Cluster(
    contact_points=['127.0.0.1'],
    connection_class=TwistedConnection
)

# Libev reactor (requires libev system library)
cluster_libev = Cluster(
    contact_points=['127.0.0.1'],
    connection_class=LibevConnection
)

# Use the appropriate reactor for your application
session = cluster_gevent.connect()

Concurrent Query Execution

from cassandra.concurrent import execute_concurrent, execute_concurrent_with_args
from cassandra.query import SimpleStatement
import uuid
from datetime import datetime

# Example 1: Execute different statements concurrently
statements_and_params = [
    (SimpleStatement("INSERT INTO users (id, name, email) VALUES (?, ?, ?)"), 
     [uuid.uuid4(), 'Alice', 'alice@example.com']),
    (SimpleStatement("INSERT INTO users (id, name, email) VALUES (?, ?, ?)"), 
     [uuid.uuid4(), 'Bob', 'bob@example.com']),
    (SimpleStatement("INSERT INTO posts (id, author, title) VALUES (?, ?, ?)"), 
     [uuid.uuid4(), 'Alice', 'My First Post']),
    (SimpleStatement("UPDATE counters SET count = count + 1 WHERE id = ?"), 
     ['total_users'])
]

# Execute all statements concurrently
results = execute_concurrent(session, statements_and_params, concurrency=50)

# Check results
for i, result in enumerate(results):
    if result is None:
        print(f"Statement {i} failed")
    else:
        print(f"Statement {i} succeeded")

# Example 2: Execute same statement with different parameters
insert_statement = SimpleStatement("INSERT INTO users (id, name, email) VALUES (?, ?, ?)")
user_data = [
    [uuid.uuid4(), 'User1', 'user1@example.com'],
    [uuid.uuid4(), 'User2', 'user2@example.com'],
    [uuid.uuid4(), 'User3', 'user3@example.com'],
    # ... many more users
]

# Execute all inserts concurrently
results = execute_concurrent_with_args(
    session, 
    insert_statement, 
    user_data, 
    concurrency=100
)

successful_inserts = sum(1 for result in results if result is not None)
print(f"Successfully inserted {successful_inserts} users")

Advanced Callback Patterns

import threading
from collections import defaultdict

class AsyncQueryManager:
    def __init__(self, session):
        self.session = session
        self.results = defaultdict(list)
        self.errors = defaultdict(list)
        self.lock = threading.Lock()
        self.completed_count = 0
        self.total_queries = 0

    def execute_queries(self, query_groups):
        """Execute multiple groups of queries with organized results."""
        
        self.total_queries = sum(len(queries) for queries in query_groups.values())
        
        for group_name, queries in query_groups.items():
            for query, params in queries:
                future = self.session.execute_async(query, params)
                future.add_callback(self._handle_success, group_name)
                future.add_errback(self._handle_error, group_name)
    
    def _handle_success(self, result, group_name):
        with self.lock:
            self.results[group_name].append(result)
            self.completed_count += 1
            self._check_completion()
    
    def _handle_error(self, error, group_name):
        with self.lock:
            self.errors[group_name].append(error)
            self.completed_count += 1
            self._check_completion()
    
    def _check_completion(self):
        if self.completed_count >= self.total_queries:
            print("All queries completed!")
            self._print_summary()
    
    def _print_summary(self):
        for group_name in self.results:
            success_count = len(self.results[group_name])
            error_count = len(self.errors[group_name])
            print(f"{group_name}: {success_count} successful, {error_count} failed")

# Usage
manager = AsyncQueryManager(session)

query_groups = {
    'user_inserts': [
        ("INSERT INTO users (id, name) VALUES (?, ?)", [uuid.uuid4(), f'User{i}'])
        for i in range(100)
    ],
    'post_inserts': [
        ("INSERT INTO posts (id, title) VALUES (?, ?)", [uuid.uuid4(), f'Post{i}'])
        for i in range(50)
    ],
    'analytics_queries': [
        ("SELECT COUNT(*) FROM users", []),
        ("SELECT COUNT(*) FROM posts", []),
        ("SELECT * FROM recent_activity LIMIT 10", [])
    ]
}

manager.execute_queries(query_groups)

Asynchronous Pagination

class AsyncPaginator:
    def __init__(self, session, query, page_size=1000):
        self.session = session
        self.query = query
        self.page_size = page_size
        self.callbacks = []
        self.error_callbacks = []
    
    def add_page_callback(self, callback):
        """Add callback to be called for each page of results."""
        self.callbacks.append(callback)
    
    def add_error_callback(self, callback):
        """Add callback to be called on errors."""
        self.error_callbacks.append(callback)
    
    def start(self):
        """Start paginating through results."""
        statement = SimpleStatement(self.query, fetch_size=self.page_size)
        future = self.session.execute_async(statement)
        future.add_callback(self._handle_page)
        future.add_errback(self._handle_error)
    
    def _handle_page(self, result):
        """Handle a page of results."""
        # Process current page
        for callback in self.callbacks:
            callback(result.current_rows)
        
        # Check if there are more pages
        if result.has_more_pages:
            # Fetch next page asynchronously
            statement = SimpleStatement(self.query, fetch_size=self.page_size)
            statement.paging_state = result.paging_state
            future = self.session.execute_async(statement)
            future.add_callback(self._handle_page)
            future.add_errback(self._handle_error)
        else:
            print("Pagination complete")
    
    def _handle_error(self, error):
        """Handle pagination errors."""
        for callback in self.error_callbacks:
            callback(error)

# Usage
def process_page(rows):
    print(f"Processing page with {len(rows)} rows")
    for row in rows:
        # Process each row
        pass

def handle_error(error):
    print(f"Pagination error: {error}")

paginator = AsyncPaginator(session, "SELECT * FROM large_table")
paginator.add_page_callback(process_page)
paginator.add_error_callback(handle_error)
paginator.start()

# Continue with other work while pagination happens in background

Asynchronous Batch Processing

import asyncio
from concurrent.futures import ThreadPoolExecutor

class AsyncBatchProcessor:
    def __init__(self, session, batch_size=1000, concurrency=10):
        self.session = session
        self.batch_size = batch_size
        self.concurrency = concurrency
        self.executor = ThreadPoolExecutor(max_workers=concurrency)
    
    def process_records(self, records, process_func):
        """Process records in batches asynchronously."""
        
        # Split records into batches
        batches = [
            records[i:i + self.batch_size] 
            for i in range(0, len(records), self.batch_size)
        ]
        
        print(f"Processing {len(records)} records in {len(batches)} batches")
        
        # Process batches concurrently
        futures = []
        for batch in batches:
            future = self.executor.submit(self._process_batch, batch, process_func)
            futures.append(future)
        
        # Wait for all batches to complete
        results = []
        for future in futures:
            try:
                result = future.result(timeout=60)
                results.append(result)
            except Exception as e:
                print(f"Batch processing error: {e}")
                results.append(None)
        
        return results
    
    def _process_batch(self, batch, process_func):
        """Process a single batch of records."""
        statements_and_params = []
        
        for record in batch:
            query, params = process_func(record)
            statements_and_params.append((query, params))
        
        # Execute batch concurrently
        results = execute_concurrent(
            self.session, 
            statements_and_params, 
            concurrency=self.concurrency
        )
        
        return results

# Usage
def create_insert_statement(user_data):
    """Convert user data to insert statement."""
    query = "INSERT INTO users (id, name, email, created_at) VALUES (?, ?, ?, ?)"
    params = [
        user_data['id'],
        user_data['name'],
        user_data['email'],
        datetime.utcnow()
    ]
    return query, params

# Process large dataset
user_records = [
    {'id': uuid.uuid4(), 'name': f'User{i}', 'email': f'user{i}@example.com'}
    for i in range(10000)
]

processor = AsyncBatchProcessor(session, batch_size=500, concurrency=20)
results = processor.process_records(user_records, create_insert_statement)

# Analyze results
total_batches = len(results)
successful_batches = sum(1 for r in results if r is not None)
print(f"Processed {total_batches} batches, {successful_batches} successful")

Connection Pool Monitoring

from cassandra.pool import Host

class ConnectionPoolMonitor:
    def __init__(self, cluster):
        self.cluster = cluster
    
    def get_pool_stats(self):
        """Get connection pool statistics for all hosts."""
        stats = {}
        
        if hasattr(self.cluster, 'metadata') and self.cluster.metadata:
            for host in self.cluster.metadata.all_hosts():
                pool = self.cluster._connection_pools.get(host)
                if pool:
                    stats[host.address] = {
                        'host_state': 'UP' if host.is_up else 'DOWN',
                        'datacenter': host.datacenter,
                        'rack': host.rack,
                        'open_connections': pool.open_count,
                        'is_pool_shutdown': pool.is_shutdown,
                        'pool_size': len(pool.get_connections()) if not pool.is_shutdown else 0
                    }
        
        return stats
    
    def print_pool_summary(self):
        """Print a summary of connection pool status."""
        stats = self.get_pool_stats()
        
        print("Connection Pool Summary:")
        print("-" * 60)
        
        for host_address, host_stats in stats.items():
            print(f"Host: {host_address}")
            print(f"  State: {host_stats['host_state']}")
            print(f"  DC/Rack: {host_stats['datacenter']}/{host_stats['rack']}")
            print(f"  Open Connections: {host_stats['open_connections']}")
            print(f"  Pool Size: {host_stats['pool_size']}")
            print()

# Usage
monitor = ConnectionPoolMonitor(cluster)

# Monitor pool stats periodically
import time
import threading

def monitor_pools():
    while True:
        monitor.print_pool_summary()
        time.sleep(30)  # Check every 30 seconds

# Start monitoring in background thread
monitor_thread = threading.Thread(target=monitor_pools, daemon=True)
monitor_thread.start()

# Your application continues running
session = cluster.connect()
# ... perform queries ...

Asynchronous Query Timeout Handling

from cassandra import OperationTimedOut
import time

class TimeoutHandler:
    def __init__(self, session, default_timeout=10.0):
        self.session = session
        self.default_timeout = default_timeout
        self.timeout_stats = {
            'total_queries': 0,
            'timeouts': 0,
            'retries': 0,
            'failures': 0
        }
    
    def execute_with_retry(self, query, params=None, max_retries=3, timeout=None):
        """Execute query with timeout handling and retries."""
        
        timeout = timeout or self.default_timeout
        
        for attempt in range(max_retries + 1):
            try:
                self.timeout_stats['total_queries'] += 1
                
                future = self.session.execute_async(query, params)
                result = future.result(timeout=timeout)
                
                return result
                
            except OperationTimedOut as e:
                self.timeout_stats['timeouts'] += 1
                
                if attempt < max_retries:
                    self.timeout_stats['retries'] += 1
                    print(f"Query timed out, retrying (attempt {attempt + 1}/{max_retries})")
                    
                    # Exponential backoff
                    time.sleep(2 ** attempt)
                    continue
                else:
                    self.timeout_stats['failures'] += 1
                    print(f"Query failed after {max_retries} retries")
                    raise
            
            except Exception as e:
                self.timeout_stats['failures'] += 1
                print(f"Query failed with non-timeout error: {e}")
                raise
    
    def get_timeout_stats(self):
        """Get timeout statistics."""
        stats = self.timeout_stats.copy()
        if stats['total_queries'] > 0:
            stats['timeout_rate'] = stats['timeouts'] / stats['total_queries']
            stats['success_rate'] = (stats['total_queries'] - stats['failures']) / stats['total_queries']
        return stats

# Usage
timeout_handler = TimeoutHandler(session, default_timeout=5.0)

# Execute queries with timeout handling
try:
    result = timeout_handler.execute_with_retry(
        "SELECT * FROM slow_table WHERE complex_condition = ?",
        params=['some_value'],
        max_retries=2,
        timeout=15.0
    )
    print(f"Query succeeded with {len(result)} results")
    
except OperationTimedOut:
    print("Query timed out after all retries")
except Exception as e:
    print(f"Query failed: {e}")

# Check timeout statistics
stats = timeout_handler.get_timeout_stats()
print(f"Timeout statistics: {stats}")

Install with Tessl CLI

npx tessl i tessl/pypi-cassandra-driver

docs

async-io.md

auth-policies.md

cluster-session.md

cql-types.md

cqlengine-orm.md

index.md

metadata.md

query-execution.md

tile.json