0
# Apache Airflow Providers Apache Cassandra
1
2
An Apache Airflow provider package enabling integration with Apache Cassandra, a highly scalable NoSQL database. This provider allows you to build data pipelines that interact with Cassandra clusters, including connection management, table and record monitoring, and workflow orchestration.
3
4
## Package Information
5
6
- **Package Name**: apache-airflow-providers-apache-cassandra
7
- **Package Type**: pypi
8
- **Language**: Python
9
- **Installation**: `pip install apache-airflow-providers-apache-cassandra`
10
- **Requirements**: Apache Airflow 2.10.0+, Python 3.10+, cassandra-driver>=3.29.1
11
12
## Core Imports
13
14
```python
15
from airflow.providers.apache.cassandra.hooks.cassandra import CassandraHook
16
from airflow.providers.apache.cassandra.sensors.record import CassandraRecordSensor
17
from airflow.providers.apache.cassandra.sensors.table import CassandraTableSensor
18
```
19
20
## Basic Usage
21
22
```python
23
from airflow import DAG
24
from airflow.providers.apache.cassandra.hooks.cassandra import CassandraHook
25
from airflow.providers.apache.cassandra.sensors.record import CassandraRecordSensor
26
from airflow.providers.apache.cassandra.sensors.table import CassandraTableSensor
27
from datetime import datetime
28
29
# Use hook for direct database operations
30
def check_cassandra_data():
31
hook = CassandraHook(cassandra_conn_id="cassandra_default")
32
session = hook.get_conn()
33
34
# Check if table exists
35
if hook.table_exists("keyspace.users"):
36
print("Table exists")
37
38
# Check if specific record exists
39
exists = hook.record_exists("keyspace.users", {"user_id": "12345"})
40
print(f"Record exists: {exists}")
41
42
hook.shutdown_cluster()
43
44
# Use sensors in DAG workflow
45
with DAG("cassandra_example", start_date=datetime(2024, 1, 1)) as dag:
46
47
# Wait for table to be created
48
table_sensor = CassandraTableSensor(
49
task_id="wait_for_table",
50
table="keyspace.users",
51
cassandra_conn_id="cassandra_default"
52
)
53
54
# Wait for specific record to appear
55
record_sensor = CassandraRecordSensor(
56
task_id="wait_for_record",
57
table="keyspace.users",
58
keys={"user_id": "12345", "email": "user@example.com"},
59
cassandra_conn_id="cassandra_default"
60
)
61
62
table_sensor >> record_sensor
63
```
64
65
## Architecture
66
67
The Airflow Cassandra provider follows the standard Airflow pattern with hooks for connection management and sensors for workflow orchestration:
68
69
- **CassandraHook**: Manages cluster connections, authentication, and provides methods for database operations. Handles connection pooling, SSL configuration, and various load balancing policies for high availability.
70
- **Sensors**: BaseSensorOperator implementations that poll Cassandra for specific conditions (table existence, record availability) and integrate into DAG workflows with standard retry and timeout behavior.
71
- **Connection Management**: Uses Airflow's connection system with proper cluster lifecycle management - connections are established on-demand and should be explicitly shut down to prevent resource leaks.
72
73
The design enables scalable data pipeline orchestration while maintaining proper resource management and connection best practices for production Cassandra deployments.
74
75
## Connection Configuration
76
77
This provider uses Airflow connections with connection type `cassandra`. Configure in Airflow UI or programmatically:
78
79
- **Connection Type**: `cassandra`
80
- **Host**: Comma-separated list of contact points (e.g., "host1,host2,host3")
81
- **Port**: Cassandra port (default: 9042)
82
- **Login**: Username (optional)
83
- **Password**: Password (optional)
84
- **Schema**: Default keyspace (optional)
85
- **Extra**: JSON configuration for advanced options
86
87
Example Extra configuration:
88
```json
89
{
90
"ssl_options": {"ca_certs": "/path/to/ca_certs"},
91
"load_balancing_policy": "DCAwareRoundRobinPolicy",
92
"load_balancing_policy_args": {
93
"local_dc": "datacenter1",
94
"used_hosts_per_remote_dc": 2
95
},
96
"cql_version": "3.4.4",
97
"protocol_version": 4
98
}
99
```
100
101
## Capabilities
102
103
### Database Connection Hook
104
105
Manages connections to Cassandra clusters with support for authentication, SSL, load balancing policies, and connection pooling.
106
107
```python { .api }
108
class CassandraHook(BaseHook, LoggingMixin):
109
"""
110
Hook for interacting with Apache Cassandra clusters.
111
112
Supports contact points configuration, authentication, SSL, and various
113
load balancing policies including DCAwareRoundRobinPolicy,
114
WhiteListRoundRobinPolicy, and TokenAwarePolicy.
115
"""
116
117
conn_name_attr = "cassandra_conn_id"
118
default_conn_name = "cassandra_default"
119
conn_type = "cassandra"
120
hook_name = "Cassandra"
121
122
def __init__(self, cassandra_conn_id: str = default_conn_name):
123
"""
124
Initialize CassandraHook with connection configuration.
125
126
Args:
127
cassandra_conn_id (str): Airflow connection ID for Cassandra
128
"""
129
130
def get_conn(self) -> Session:
131
"""
132
Return a cassandra Session object.
133
134
Returns:
135
Session: Active Cassandra session for executing queries
136
"""
137
138
def get_cluster(self) -> Cluster:
139
"""
140
Return Cassandra cluster object.
141
142
Returns:
143
Cluster: Cassandra cluster instance
144
"""
145
146
def shutdown_cluster(self) -> None:
147
"""
148
Close all sessions and connections associated with this Cluster.
149
Call this method to properly clean up resources.
150
"""
151
152
def table_exists(self, table: str) -> bool:
153
"""
154
Check if a table exists in Cassandra.
155
156
Args:
157
table (str): Target table name. Use dot notation for
158
specific keyspace (e.g., "keyspace.table")
159
160
Returns:
161
bool: True if table exists, False otherwise
162
"""
163
164
def record_exists(self, table: str, keys: dict[str, str]) -> bool:
165
"""
166
Check if a record exists in Cassandra based on primary key values.
167
168
Args:
169
table (str): Target table name. Use dot notation for
170
specific keyspace (e.g., "keyspace.table").
171
Input is sanitized to prevent injection attacks.
172
keys (dict[str, str]): Primary key column names and values.
173
Used to construct WHERE clause conditions.
174
175
Returns:
176
bool: True if record exists, False otherwise.
177
Returns False on any query execution errors.
178
179
Note:
180
This method sanitizes input to prevent SQL injection. Table and
181
keyspace names must match ^\w+$ pattern. Query errors are
182
caught and return False rather than raising exceptions.
183
"""
184
185
@staticmethod
186
def get_lb_policy(policy_name: str, policy_args: dict[str, Any]) -> Policy:
187
"""
188
Create load balancing policy for cluster connection.
189
190
Args:
191
policy_name (str): Policy type ("DCAwareRoundRobinPolicy",
192
"WhiteListRoundRobinPolicy", "TokenAwarePolicy",
193
or "RoundRobinPolicy"). Falls back to RoundRobinPolicy
194
for unrecognized policy names.
195
policy_args (dict): Policy-specific configuration parameters:
196
- DCAwareRoundRobinPolicy: local_dc (str), used_hosts_per_remote_dc (int)
197
- WhiteListRoundRobinPolicy: hosts (list) - required
198
- TokenAwarePolicy: child_load_balancing_policy (str),
199
child_load_balancing_policy_args (dict)
200
201
Returns:
202
Policy: Configured load balancing policy instance. Returns RoundRobinPolicy
203
as fallback for invalid configurations.
204
205
Raises:
206
ValueError: When required parameters are missing (e.g., hosts for WhiteListRoundRobinPolicy)
207
"""
208
```
209
210
### Table Existence Sensor
211
212
Monitors Cassandra clusters waiting for specific tables to be created, useful for orchestrating workflows that depend on schema changes.
213
214
```python { .api }
215
class CassandraTableSensor(BaseSensorOperator):
216
"""
217
Sensor that checks for the existence of a table in a Cassandra cluster.
218
219
Inherits standard sensor behavior with poke interval, timeout, and
220
retry capabilities. Useful for waiting on schema migrations or
221
table creation tasks.
222
"""
223
224
template_fields = ("table",)
225
226
def __init__(
227
self,
228
*,
229
table: str,
230
cassandra_conn_id: str = CassandraHook.default_conn_name,
231
**kwargs: Any,
232
) -> None:
233
"""
234
Initialize table sensor.
235
236
Args:
237
table (str): Target table name. Use dot notation for
238
specific keyspace (e.g., "keyspace.table")
239
cassandra_conn_id (str): Airflow connection ID for Cassandra
240
**kwargs: Additional sensor parameters (poke_interval, timeout, etc.)
241
"""
242
243
def poke(self, context: Context) -> bool:
244
"""
245
Check if the specified table exists in Cassandra.
246
247
Args:
248
context (Context): Airflow task execution context
249
250
Returns:
251
bool: True if table exists (sensor succeeds), False to continue waiting
252
"""
253
```
254
255
### Record Existence Sensor
256
257
Monitors Cassandra clusters waiting for specific records to appear, enabling data-driven workflow orchestration and event-based pipeline triggers.
258
259
```python { .api }
260
class CassandraRecordSensor(BaseSensorOperator):
261
"""
262
Sensor that checks for the existence of a record in a Cassandra cluster.
263
264
Monitors for records based on primary key values, supporting complex
265
composite keys. Useful for triggering downstream tasks when specific
266
data becomes available.
267
"""
268
269
template_fields = ("table", "keys")
270
271
def __init__(
272
self,
273
*,
274
keys: dict[str, str],
275
table: str,
276
cassandra_conn_id: str = CassandraHook.default_conn_name,
277
**kwargs: Any,
278
) -> None:
279
"""
280
Initialize record sensor.
281
282
Args:
283
keys (dict[str, str]): Primary key column names and values to monitor.
284
All specified keys must match for record to be found.
285
table (str): Target table name. Use dot notation for
286
specific keyspace (e.g., "keyspace.table")
287
cassandra_conn_id (str): Airflow connection ID for Cassandra
288
**kwargs: Additional sensor parameters (poke_interval, timeout, etc.)
289
"""
290
291
def poke(self, context: Context) -> bool:
292
"""
293
Check if record with specified key values exists in Cassandra.
294
295
Args:
296
context (Context): Airflow task execution context
297
298
Returns:
299
bool: True if record exists (sensor succeeds), False to continue waiting
300
"""
301
```
302
303
## Types
304
305
```python { .api }
306
from cassandra.cluster import Cluster, Session
307
from cassandra.auth import PlainTextAuthProvider
308
from cassandra.policies import (
309
DCAwareRoundRobinPolicy,
310
RoundRobinPolicy,
311
TokenAwarePolicy,
312
WhiteListRoundRobinPolicy
313
)
314
from airflow.utils.context import Context
315
from airflow.utils.log.logging_mixin import LoggingMixin
316
from collections.abc import Sequence
317
from typing import Any, TypeAlias, TYPE_CHECKING
318
319
# Load balancing policy type alias
320
Policy: TypeAlias = (
321
DCAwareRoundRobinPolicy |
322
RoundRobinPolicy |
323
TokenAwarePolicy |
324
WhiteListRoundRobinPolicy
325
)
326
```
327
328
## Version Compatibility
329
330
The package includes version compatibility utilities for different Airflow versions:
331
332
```python { .api }
333
# Version compatibility constants
334
AIRFLOW_V_3_0_PLUS: bool # True if Airflow 3.0+
335
AIRFLOW_V_3_1_PLUS: bool # True if Airflow 3.1+
336
337
# Compatibility imports (automatically selected based on Airflow version)
338
BaseHook # Base class for hooks
339
BaseSensorOperator # Base class for sensors
340
```
341
342
## Usage Examples
343
344
### Waiting for Table Creation
345
```python
346
from airflow.providers.apache.cassandra.sensors.table import CassandraTableSensor
347
348
# Wait for a table to be created before proceeding
349
table_sensor = CassandraTableSensor(
350
task_id="wait_for_user_table",
351
table="production.users", # keyspace.table format
352
cassandra_conn_id="prod_cassandra",
353
poke_interval=30, # Check every 30 seconds
354
timeout=3600, # Timeout after 1 hour
355
)
356
```
357
358
### Monitoring Record Availability
359
```python
360
from airflow.providers.apache.cassandra.sensors.record import CassandraRecordSensor
361
362
# Wait for specific user record to be inserted
363
record_sensor = CassandraRecordSensor(
364
task_id="wait_for_user_data",
365
table="production.users",
366
keys={
367
"user_id": "{{ ds }}", # Template support for dynamic values
368
"region": "us-east-1"
369
},
370
cassandra_conn_id="prod_cassandra",
371
poke_interval=60,
372
timeout=7200,
373
)
374
```
375
376
### Custom Load Balancing
377
```python
378
# Configure connection with custom load balancing policy
379
connection_extra = {
380
"load_balancing_policy": "DCAwareRoundRobinPolicy",
381
"load_balancing_policy_args": {
382
"local_dc": "datacenter1",
383
"used_hosts_per_remote_dc": 2
384
}
385
}
386
387
hook = CassandraHook("cassandra_with_lb_policy")
388
session = hook.get_conn()
389
# Use session for queries
390
hook.shutdown_cluster()
391
```
392
393
### Error Handling
394
```python
395
from airflow.providers.apache.cassandra.hooks.cassandra import CassandraHook
396
397
def safe_cassandra_operation():
398
hook = None
399
try:
400
hook = CassandraHook("cassandra_default")
401
402
if not hook.table_exists("keyspace.required_table"):
403
raise ValueError("Required table does not exist")
404
405
record_exists = hook.record_exists("keyspace.users", {"id": "123"})
406
return record_exists
407
408
except Exception as e:
409
print(f"Cassandra operation failed: {e}")
410
return False
411
finally:
412
if hook:
413
hook.shutdown_cluster()
414
```