or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

index.md
tile.json

tessl/pypi-apache-airflow-providers-apache-cassandra

Apache Airflow provider package enabling integration with Apache Cassandra for orchestrating data pipelines

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
pypipkg:pypi/apache-airflow-providers-apache-cassandra@3.8.x

To install, run

npx @tessl/cli install tessl/pypi-apache-airflow-providers-apache-cassandra@3.8.0

index.mddocs/

Apache Airflow Providers Apache Cassandra

An Apache Airflow provider package enabling integration with Apache Cassandra, a highly scalable NoSQL database. This provider allows you to build data pipelines that interact with Cassandra clusters, including connection management, table and record monitoring, and workflow orchestration.

Package Information

  • Package Name: apache-airflow-providers-apache-cassandra
  • Package Type: pypi
  • Language: Python
  • Installation: pip install apache-airflow-providers-apache-cassandra
  • Requirements: Apache Airflow 2.10.0+, Python 3.10+, cassandra-driver>=3.29.1

Core Imports

from airflow.providers.apache.cassandra.hooks.cassandra import CassandraHook
from airflow.providers.apache.cassandra.sensors.record import CassandraRecordSensor
from airflow.providers.apache.cassandra.sensors.table import CassandraTableSensor

Basic Usage

from airflow import DAG
from airflow.providers.apache.cassandra.hooks.cassandra import CassandraHook
from airflow.providers.apache.cassandra.sensors.record import CassandraRecordSensor
from airflow.providers.apache.cassandra.sensors.table import CassandraTableSensor
from datetime import datetime

# Use hook for direct database operations
def check_cassandra_data():
    hook = CassandraHook(cassandra_conn_id="cassandra_default")
    session = hook.get_conn()
    
    # Check if table exists
    if hook.table_exists("keyspace.users"):
        print("Table exists")
    
    # Check if specific record exists
    exists = hook.record_exists("keyspace.users", {"user_id": "12345"})
    print(f"Record exists: {exists}")
    
    hook.shutdown_cluster()

# Use sensors in DAG workflow
with DAG("cassandra_example", start_date=datetime(2024, 1, 1)) as dag:
    
    # Wait for table to be created
    table_sensor = CassandraTableSensor(
        task_id="wait_for_table",
        table="keyspace.users",
        cassandra_conn_id="cassandra_default"
    )
    
    # Wait for specific record to appear
    record_sensor = CassandraRecordSensor(
        task_id="wait_for_record",
        table="keyspace.users",
        keys={"user_id": "12345", "email": "user@example.com"},
        cassandra_conn_id="cassandra_default"
    )
    
    table_sensor >> record_sensor

Architecture

The Airflow Cassandra provider follows the standard Airflow pattern with hooks for connection management and sensors for workflow orchestration:

  • CassandraHook: Manages cluster connections, authentication, and provides methods for database operations. Handles connection pooling, SSL configuration, and various load balancing policies for high availability.
  • Sensors: BaseSensorOperator implementations that poll Cassandra for specific conditions (table existence, record availability) and integrate into DAG workflows with standard retry and timeout behavior.
  • Connection Management: Uses Airflow's connection system with proper cluster lifecycle management - connections are established on-demand and should be explicitly shut down to prevent resource leaks.

The design enables scalable data pipeline orchestration while maintaining proper resource management and connection best practices for production Cassandra deployments.

Connection Configuration

This provider uses Airflow connections with connection type cassandra. Configure in Airflow UI or programmatically:

  • Connection Type: cassandra
  • Host: Comma-separated list of contact points (e.g., "host1,host2,host3")
  • Port: Cassandra port (default: 9042)
  • Login: Username (optional)
  • Password: Password (optional)
  • Schema: Default keyspace (optional)
  • Extra: JSON configuration for advanced options

Example Extra configuration:

{
  "ssl_options": {"ca_certs": "/path/to/ca_certs"},
  "load_balancing_policy": "DCAwareRoundRobinPolicy",
  "load_balancing_policy_args": {
    "local_dc": "datacenter1",
    "used_hosts_per_remote_dc": 2
  },
  "cql_version": "3.4.4",
  "protocol_version": 4
}

Capabilities

Database Connection Hook

Manages connections to Cassandra clusters with support for authentication, SSL, load balancing policies, and connection pooling.

class CassandraHook(BaseHook, LoggingMixin):
    """
    Hook for interacting with Apache Cassandra clusters.
    
    Supports contact points configuration, authentication, SSL, and various
    load balancing policies including DCAwareRoundRobinPolicy, 
    WhiteListRoundRobinPolicy, and TokenAwarePolicy.
    """
    
    conn_name_attr = "cassandra_conn_id"
    default_conn_name = "cassandra_default"
    conn_type = "cassandra"
    hook_name = "Cassandra"
    
    def __init__(self, cassandra_conn_id: str = default_conn_name):
        """
        Initialize CassandraHook with connection configuration.
        
        Args:
            cassandra_conn_id (str): Airflow connection ID for Cassandra
        """
    
    def get_conn(self) -> Session:
        """
        Return a cassandra Session object.
        
        Returns:
            Session: Active Cassandra session for executing queries
        """
    
    def get_cluster(self) -> Cluster:
        """
        Return Cassandra cluster object.
        
        Returns:
            Cluster: Cassandra cluster instance
        """
    
    def shutdown_cluster(self) -> None:
        """
        Close all sessions and connections associated with this Cluster.
        Call this method to properly clean up resources.
        """
    
    def table_exists(self, table: str) -> bool:
        """
        Check if a table exists in Cassandra.
        
        Args:
            table (str): Target table name. Use dot notation for 
                        specific keyspace (e.g., "keyspace.table")
        
        Returns:
            bool: True if table exists, False otherwise
        """
    
    def record_exists(self, table: str, keys: dict[str, str]) -> bool:
        """
        Check if a record exists in Cassandra based on primary key values.
        
        Args:
            table (str): Target table name. Use dot notation for 
                        specific keyspace (e.g., "keyspace.table").
                        Input is sanitized to prevent injection attacks.
            keys (dict[str, str]): Primary key column names and values.
                                  Used to construct WHERE clause conditions.
        
        Returns:
            bool: True if record exists, False otherwise.
                 Returns False on any query execution errors.
        
        Note:
            This method sanitizes input to prevent SQL injection. Table and
            keyspace names must match ^\w+$ pattern. Query errors are
            caught and return False rather than raising exceptions.
        """
    
    @staticmethod
    def get_lb_policy(policy_name: str, policy_args: dict[str, Any]) -> Policy:
        """
        Create load balancing policy for cluster connection.
        
        Args:
            policy_name (str): Policy type ("DCAwareRoundRobinPolicy", 
                              "WhiteListRoundRobinPolicy", "TokenAwarePolicy", 
                              or "RoundRobinPolicy"). Falls back to RoundRobinPolicy
                              for unrecognized policy names.
            policy_args (dict): Policy-specific configuration parameters:
                - DCAwareRoundRobinPolicy: local_dc (str), used_hosts_per_remote_dc (int)
                - WhiteListRoundRobinPolicy: hosts (list) - required
                - TokenAwarePolicy: child_load_balancing_policy (str), 
                  child_load_balancing_policy_args (dict)
        
        Returns:
            Policy: Configured load balancing policy instance. Returns RoundRobinPolicy
                   as fallback for invalid configurations.
        
        Raises:
            ValueError: When required parameters are missing (e.g., hosts for WhiteListRoundRobinPolicy)
        """

Table Existence Sensor

Monitors Cassandra clusters waiting for specific tables to be created, useful for orchestrating workflows that depend on schema changes.

class CassandraTableSensor(BaseSensorOperator):
    """
    Sensor that checks for the existence of a table in a Cassandra cluster.
    
    Inherits standard sensor behavior with poke interval, timeout, and 
    retry capabilities. Useful for waiting on schema migrations or 
    table creation tasks.
    """
    
    template_fields = ("table",)
    
    def __init__(
        self,
        *,
        table: str,
        cassandra_conn_id: str = CassandraHook.default_conn_name,
        **kwargs: Any,
    ) -> None:
        """
        Initialize table sensor.
        
        Args:
            table (str): Target table name. Use dot notation for 
                        specific keyspace (e.g., "keyspace.table") 
            cassandra_conn_id (str): Airflow connection ID for Cassandra
            **kwargs: Additional sensor parameters (poke_interval, timeout, etc.)
        """
    
    def poke(self, context: Context) -> bool:
        """
        Check if the specified table exists in Cassandra.
        
        Args:
            context (Context): Airflow task execution context
        
        Returns:
            bool: True if table exists (sensor succeeds), False to continue waiting
        """

Record Existence Sensor

Monitors Cassandra clusters waiting for specific records to appear, enabling data-driven workflow orchestration and event-based pipeline triggers.

class CassandraRecordSensor(BaseSensorOperator):
    """
    Sensor that checks for the existence of a record in a Cassandra cluster.
    
    Monitors for records based on primary key values, supporting complex
    composite keys. Useful for triggering downstream tasks when specific
    data becomes available.
    """
    
    template_fields = ("table", "keys")
    
    def __init__(
        self,
        *,
        keys: dict[str, str],
        table: str,
        cassandra_conn_id: str = CassandraHook.default_conn_name,
        **kwargs: Any,
    ) -> None:
        """
        Initialize record sensor.
        
        Args:
            keys (dict[str, str]): Primary key column names and values to monitor.
                                  All specified keys must match for record to be found.
            table (str): Target table name. Use dot notation for 
                        specific keyspace (e.g., "keyspace.table")
            cassandra_conn_id (str): Airflow connection ID for Cassandra
            **kwargs: Additional sensor parameters (poke_interval, timeout, etc.)
        """
    
    def poke(self, context: Context) -> bool:
        """
        Check if record with specified key values exists in Cassandra.
        
        Args:
            context (Context): Airflow task execution context
        
        Returns:
            bool: True if record exists (sensor succeeds), False to continue waiting
        """

Types

from cassandra.cluster import Cluster, Session
from cassandra.auth import PlainTextAuthProvider
from cassandra.policies import (
    DCAwareRoundRobinPolicy, 
    RoundRobinPolicy, 
    TokenAwarePolicy, 
    WhiteListRoundRobinPolicy
)
from airflow.utils.context import Context
from airflow.utils.log.logging_mixin import LoggingMixin
from collections.abc import Sequence
from typing import Any, TypeAlias, TYPE_CHECKING

# Load balancing policy type alias
Policy: TypeAlias = (
    DCAwareRoundRobinPolicy | 
    RoundRobinPolicy | 
    TokenAwarePolicy | 
    WhiteListRoundRobinPolicy
)

Version Compatibility

The package includes version compatibility utilities for different Airflow versions:

# Version compatibility constants
AIRFLOW_V_3_0_PLUS: bool  # True if Airflow 3.0+
AIRFLOW_V_3_1_PLUS: bool  # True if Airflow 3.1+

# Compatibility imports (automatically selected based on Airflow version)
BaseHook  # Base class for hooks
BaseSensorOperator  # Base class for sensors

Usage Examples

Waiting for Table Creation

from airflow.providers.apache.cassandra.sensors.table import CassandraTableSensor

# Wait for a table to be created before proceeding
table_sensor = CassandraTableSensor(
    task_id="wait_for_user_table",
    table="production.users",  # keyspace.table format
    cassandra_conn_id="prod_cassandra",
    poke_interval=30,  # Check every 30 seconds
    timeout=3600,      # Timeout after 1 hour
)

Monitoring Record Availability

from airflow.providers.apache.cassandra.sensors.record import CassandraRecordSensor

# Wait for specific user record to be inserted
record_sensor = CassandraRecordSensor(
    task_id="wait_for_user_data",
    table="production.users",
    keys={
        "user_id": "{{ ds }}",  # Template support for dynamic values
        "region": "us-east-1"
    },
    cassandra_conn_id="prod_cassandra",
    poke_interval=60,
    timeout=7200,
)

Custom Load Balancing

# Configure connection with custom load balancing policy
connection_extra = {
    "load_balancing_policy": "DCAwareRoundRobinPolicy",
    "load_balancing_policy_args": {
        "local_dc": "datacenter1",
        "used_hosts_per_remote_dc": 2
    }
}

hook = CassandraHook("cassandra_with_lb_policy")
session = hook.get_conn()
# Use session for queries
hook.shutdown_cluster()

Error Handling

from airflow.providers.apache.cassandra.hooks.cassandra import CassandraHook

def safe_cassandra_operation():
    hook = None
    try:
        hook = CassandraHook("cassandra_default")
        
        if not hook.table_exists("keyspace.required_table"):
            raise ValueError("Required table does not exist")
            
        record_exists = hook.record_exists("keyspace.users", {"id": "123"})
        return record_exists
        
    except Exception as e:
        print(f"Cassandra operation failed: {e}")
        return False
    finally:
        if hook:
            hook.shutdown_cluster()