Apache Airflow provider package enabling integration with Apache Cassandra for orchestrating data pipelines
npx @tessl/cli install tessl/pypi-apache-airflow-providers-apache-cassandra@3.8.0An Apache Airflow provider package enabling integration with Apache Cassandra, a highly scalable NoSQL database. This provider allows you to build data pipelines that interact with Cassandra clusters, including connection management, table and record monitoring, and workflow orchestration.
pip install apache-airflow-providers-apache-cassandrafrom airflow.providers.apache.cassandra.hooks.cassandra import CassandraHook
from airflow.providers.apache.cassandra.sensors.record import CassandraRecordSensor
from airflow.providers.apache.cassandra.sensors.table import CassandraTableSensorfrom airflow import DAG
from airflow.providers.apache.cassandra.hooks.cassandra import CassandraHook
from airflow.providers.apache.cassandra.sensors.record import CassandraRecordSensor
from airflow.providers.apache.cassandra.sensors.table import CassandraTableSensor
from datetime import datetime
# Use hook for direct database operations
def check_cassandra_data():
hook = CassandraHook(cassandra_conn_id="cassandra_default")
session = hook.get_conn()
# Check if table exists
if hook.table_exists("keyspace.users"):
print("Table exists")
# Check if specific record exists
exists = hook.record_exists("keyspace.users", {"user_id": "12345"})
print(f"Record exists: {exists}")
hook.shutdown_cluster()
# Use sensors in DAG workflow
with DAG("cassandra_example", start_date=datetime(2024, 1, 1)) as dag:
# Wait for table to be created
table_sensor = CassandraTableSensor(
task_id="wait_for_table",
table="keyspace.users",
cassandra_conn_id="cassandra_default"
)
# Wait for specific record to appear
record_sensor = CassandraRecordSensor(
task_id="wait_for_record",
table="keyspace.users",
keys={"user_id": "12345", "email": "user@example.com"},
cassandra_conn_id="cassandra_default"
)
table_sensor >> record_sensorThe Airflow Cassandra provider follows the standard Airflow pattern with hooks for connection management and sensors for workflow orchestration:
The design enables scalable data pipeline orchestration while maintaining proper resource management and connection best practices for production Cassandra deployments.
This provider uses Airflow connections with connection type cassandra. Configure in Airflow UI or programmatically:
cassandraExample Extra configuration:
{
"ssl_options": {"ca_certs": "/path/to/ca_certs"},
"load_balancing_policy": "DCAwareRoundRobinPolicy",
"load_balancing_policy_args": {
"local_dc": "datacenter1",
"used_hosts_per_remote_dc": 2
},
"cql_version": "3.4.4",
"protocol_version": 4
}Manages connections to Cassandra clusters with support for authentication, SSL, load balancing policies, and connection pooling.
class CassandraHook(BaseHook, LoggingMixin):
"""
Hook for interacting with Apache Cassandra clusters.
Supports contact points configuration, authentication, SSL, and various
load balancing policies including DCAwareRoundRobinPolicy,
WhiteListRoundRobinPolicy, and TokenAwarePolicy.
"""
conn_name_attr = "cassandra_conn_id"
default_conn_name = "cassandra_default"
conn_type = "cassandra"
hook_name = "Cassandra"
def __init__(self, cassandra_conn_id: str = default_conn_name):
"""
Initialize CassandraHook with connection configuration.
Args:
cassandra_conn_id (str): Airflow connection ID for Cassandra
"""
def get_conn(self) -> Session:
"""
Return a cassandra Session object.
Returns:
Session: Active Cassandra session for executing queries
"""
def get_cluster(self) -> Cluster:
"""
Return Cassandra cluster object.
Returns:
Cluster: Cassandra cluster instance
"""
def shutdown_cluster(self) -> None:
"""
Close all sessions and connections associated with this Cluster.
Call this method to properly clean up resources.
"""
def table_exists(self, table: str) -> bool:
"""
Check if a table exists in Cassandra.
Args:
table (str): Target table name. Use dot notation for
specific keyspace (e.g., "keyspace.table")
Returns:
bool: True if table exists, False otherwise
"""
def record_exists(self, table: str, keys: dict[str, str]) -> bool:
"""
Check if a record exists in Cassandra based on primary key values.
Args:
table (str): Target table name. Use dot notation for
specific keyspace (e.g., "keyspace.table").
Input is sanitized to prevent injection attacks.
keys (dict[str, str]): Primary key column names and values.
Used to construct WHERE clause conditions.
Returns:
bool: True if record exists, False otherwise.
Returns False on any query execution errors.
Note:
This method sanitizes input to prevent SQL injection. Table and
keyspace names must match ^\w+$ pattern. Query errors are
caught and return False rather than raising exceptions.
"""
@staticmethod
def get_lb_policy(policy_name: str, policy_args: dict[str, Any]) -> Policy:
"""
Create load balancing policy for cluster connection.
Args:
policy_name (str): Policy type ("DCAwareRoundRobinPolicy",
"WhiteListRoundRobinPolicy", "TokenAwarePolicy",
or "RoundRobinPolicy"). Falls back to RoundRobinPolicy
for unrecognized policy names.
policy_args (dict): Policy-specific configuration parameters:
- DCAwareRoundRobinPolicy: local_dc (str), used_hosts_per_remote_dc (int)
- WhiteListRoundRobinPolicy: hosts (list) - required
- TokenAwarePolicy: child_load_balancing_policy (str),
child_load_balancing_policy_args (dict)
Returns:
Policy: Configured load balancing policy instance. Returns RoundRobinPolicy
as fallback for invalid configurations.
Raises:
ValueError: When required parameters are missing (e.g., hosts for WhiteListRoundRobinPolicy)
"""Monitors Cassandra clusters waiting for specific tables to be created, useful for orchestrating workflows that depend on schema changes.
class CassandraTableSensor(BaseSensorOperator):
"""
Sensor that checks for the existence of a table in a Cassandra cluster.
Inherits standard sensor behavior with poke interval, timeout, and
retry capabilities. Useful for waiting on schema migrations or
table creation tasks.
"""
template_fields = ("table",)
def __init__(
self,
*,
table: str,
cassandra_conn_id: str = CassandraHook.default_conn_name,
**kwargs: Any,
) -> None:
"""
Initialize table sensor.
Args:
table (str): Target table name. Use dot notation for
specific keyspace (e.g., "keyspace.table")
cassandra_conn_id (str): Airflow connection ID for Cassandra
**kwargs: Additional sensor parameters (poke_interval, timeout, etc.)
"""
def poke(self, context: Context) -> bool:
"""
Check if the specified table exists in Cassandra.
Args:
context (Context): Airflow task execution context
Returns:
bool: True if table exists (sensor succeeds), False to continue waiting
"""Monitors Cassandra clusters waiting for specific records to appear, enabling data-driven workflow orchestration and event-based pipeline triggers.
class CassandraRecordSensor(BaseSensorOperator):
"""
Sensor that checks for the existence of a record in a Cassandra cluster.
Monitors for records based on primary key values, supporting complex
composite keys. Useful for triggering downstream tasks when specific
data becomes available.
"""
template_fields = ("table", "keys")
def __init__(
self,
*,
keys: dict[str, str],
table: str,
cassandra_conn_id: str = CassandraHook.default_conn_name,
**kwargs: Any,
) -> None:
"""
Initialize record sensor.
Args:
keys (dict[str, str]): Primary key column names and values to monitor.
All specified keys must match for record to be found.
table (str): Target table name. Use dot notation for
specific keyspace (e.g., "keyspace.table")
cassandra_conn_id (str): Airflow connection ID for Cassandra
**kwargs: Additional sensor parameters (poke_interval, timeout, etc.)
"""
def poke(self, context: Context) -> bool:
"""
Check if record with specified key values exists in Cassandra.
Args:
context (Context): Airflow task execution context
Returns:
bool: True if record exists (sensor succeeds), False to continue waiting
"""from cassandra.cluster import Cluster, Session
from cassandra.auth import PlainTextAuthProvider
from cassandra.policies import (
DCAwareRoundRobinPolicy,
RoundRobinPolicy,
TokenAwarePolicy,
WhiteListRoundRobinPolicy
)
from airflow.utils.context import Context
from airflow.utils.log.logging_mixin import LoggingMixin
from collections.abc import Sequence
from typing import Any, TypeAlias, TYPE_CHECKING
# Load balancing policy type alias
Policy: TypeAlias = (
DCAwareRoundRobinPolicy |
RoundRobinPolicy |
TokenAwarePolicy |
WhiteListRoundRobinPolicy
)The package includes version compatibility utilities for different Airflow versions:
# Version compatibility constants
AIRFLOW_V_3_0_PLUS: bool # True if Airflow 3.0+
AIRFLOW_V_3_1_PLUS: bool # True if Airflow 3.1+
# Compatibility imports (automatically selected based on Airflow version)
BaseHook # Base class for hooks
BaseSensorOperator # Base class for sensorsfrom airflow.providers.apache.cassandra.sensors.table import CassandraTableSensor
# Wait for a table to be created before proceeding
table_sensor = CassandraTableSensor(
task_id="wait_for_user_table",
table="production.users", # keyspace.table format
cassandra_conn_id="prod_cassandra",
poke_interval=30, # Check every 30 seconds
timeout=3600, # Timeout after 1 hour
)from airflow.providers.apache.cassandra.sensors.record import CassandraRecordSensor
# Wait for specific user record to be inserted
record_sensor = CassandraRecordSensor(
task_id="wait_for_user_data",
table="production.users",
keys={
"user_id": "{{ ds }}", # Template support for dynamic values
"region": "us-east-1"
},
cassandra_conn_id="prod_cassandra",
poke_interval=60,
timeout=7200,
)# Configure connection with custom load balancing policy
connection_extra = {
"load_balancing_policy": "DCAwareRoundRobinPolicy",
"load_balancing_policy_args": {
"local_dc": "datacenter1",
"used_hosts_per_remote_dc": 2
}
}
hook = CassandraHook("cassandra_with_lb_policy")
session = hook.get_conn()
# Use session for queries
hook.shutdown_cluster()from airflow.providers.apache.cassandra.hooks.cassandra import CassandraHook
def safe_cassandra_operation():
hook = None
try:
hook = CassandraHook("cassandra_default")
if not hook.table_exists("keyspace.required_table"):
raise ValueError("Required table does not exist")
record_exists = hook.record_exists("keyspace.users", {"id": "123"})
return record_exists
except Exception as e:
print(f"Cassandra operation failed: {e}")
return False
finally:
if hook:
hook.shutdown_cluster()