or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

index.md

index.mddocs/

0

# Apache Airflow Providers Apache Cassandra

1

2

An Apache Airflow provider package enabling integration with Apache Cassandra, a highly scalable NoSQL database. This provider allows you to build data pipelines that interact with Cassandra clusters, including connection management, table and record monitoring, and workflow orchestration.

3

4

## Package Information

5

6

- **Package Name**: apache-airflow-providers-apache-cassandra

7

- **Package Type**: pypi

8

- **Language**: Python

9

- **Installation**: `pip install apache-airflow-providers-apache-cassandra`

10

- **Requirements**: Apache Airflow 2.10.0+, Python 3.10+, cassandra-driver>=3.29.1

11

12

## Core Imports

13

14

```python

15

from airflow.providers.apache.cassandra.hooks.cassandra import CassandraHook

16

from airflow.providers.apache.cassandra.sensors.record import CassandraRecordSensor

17

from airflow.providers.apache.cassandra.sensors.table import CassandraTableSensor

18

```

19

20

## Basic Usage

21

22

```python

23

from airflow import DAG

24

from airflow.providers.apache.cassandra.hooks.cassandra import CassandraHook

25

from airflow.providers.apache.cassandra.sensors.record import CassandraRecordSensor

26

from airflow.providers.apache.cassandra.sensors.table import CassandraTableSensor

27

from datetime import datetime

28

29

# Use hook for direct database operations

30

def check_cassandra_data():

31

hook = CassandraHook(cassandra_conn_id="cassandra_default")

32

session = hook.get_conn()

33

34

# Check if table exists

35

if hook.table_exists("keyspace.users"):

36

print("Table exists")

37

38

# Check if specific record exists

39

exists = hook.record_exists("keyspace.users", {"user_id": "12345"})

40

print(f"Record exists: {exists}")

41

42

hook.shutdown_cluster()

43

44

# Use sensors in DAG workflow

45

with DAG("cassandra_example", start_date=datetime(2024, 1, 1)) as dag:

46

47

# Wait for table to be created

48

table_sensor = CassandraTableSensor(

49

task_id="wait_for_table",

50

table="keyspace.users",

51

cassandra_conn_id="cassandra_default"

52

)

53

54

# Wait for specific record to appear

55

record_sensor = CassandraRecordSensor(

56

task_id="wait_for_record",

57

table="keyspace.users",

58

keys={"user_id": "12345", "email": "user@example.com"},

59

cassandra_conn_id="cassandra_default"

60

)

61

62

table_sensor >> record_sensor

63

```

64

65

## Architecture

66

67

The Airflow Cassandra provider follows the standard Airflow pattern with hooks for connection management and sensors for workflow orchestration:

68

69

- **CassandraHook**: Manages cluster connections, authentication, and provides methods for database operations. Handles connection pooling, SSL configuration, and various load balancing policies for high availability.

70

- **Sensors**: BaseSensorOperator implementations that poll Cassandra for specific conditions (table existence, record availability) and integrate into DAG workflows with standard retry and timeout behavior.

71

- **Connection Management**: Uses Airflow's connection system with proper cluster lifecycle management - connections are established on-demand and should be explicitly shut down to prevent resource leaks.

72

73

The design enables scalable data pipeline orchestration while maintaining proper resource management and connection best practices for production Cassandra deployments.

74

75

## Connection Configuration

76

77

This provider uses Airflow connections with connection type `cassandra`. Configure in Airflow UI or programmatically:

78

79

- **Connection Type**: `cassandra`

80

- **Host**: Comma-separated list of contact points (e.g., "host1,host2,host3")

81

- **Port**: Cassandra port (default: 9042)

82

- **Login**: Username (optional)

83

- **Password**: Password (optional)

84

- **Schema**: Default keyspace (optional)

85

- **Extra**: JSON configuration for advanced options

86

87

Example Extra configuration:

88

```json

89

{

90

"ssl_options": {"ca_certs": "/path/to/ca_certs"},

91

"load_balancing_policy": "DCAwareRoundRobinPolicy",

92

"load_balancing_policy_args": {

93

"local_dc": "datacenter1",

94

"used_hosts_per_remote_dc": 2

95

},

96

"cql_version": "3.4.4",

97

"protocol_version": 4

98

}

99

```

100

101

## Capabilities

102

103

### Database Connection Hook

104

105

Manages connections to Cassandra clusters with support for authentication, SSL, load balancing policies, and connection pooling.

106

107

```python { .api }

108

class CassandraHook(BaseHook, LoggingMixin):

109

"""

110

Hook for interacting with Apache Cassandra clusters.

111

112

Supports contact points configuration, authentication, SSL, and various

113

load balancing policies including DCAwareRoundRobinPolicy,

114

WhiteListRoundRobinPolicy, and TokenAwarePolicy.

115

"""

116

117

conn_name_attr = "cassandra_conn_id"

118

default_conn_name = "cassandra_default"

119

conn_type = "cassandra"

120

hook_name = "Cassandra"

121

122

def __init__(self, cassandra_conn_id: str = default_conn_name):

123

"""

124

Initialize CassandraHook with connection configuration.

125

126

Args:

127

cassandra_conn_id (str): Airflow connection ID for Cassandra

128

"""

129

130

def get_conn(self) -> Session:

131

"""

132

Return a cassandra Session object.

133

134

Returns:

135

Session: Active Cassandra session for executing queries

136

"""

137

138

def get_cluster(self) -> Cluster:

139

"""

140

Return Cassandra cluster object.

141

142

Returns:

143

Cluster: Cassandra cluster instance

144

"""

145

146

def shutdown_cluster(self) -> None:

147

"""

148

Close all sessions and connections associated with this Cluster.

149

Call this method to properly clean up resources.

150

"""

151

152

def table_exists(self, table: str) -> bool:

153

"""

154

Check if a table exists in Cassandra.

155

156

Args:

157

table (str): Target table name. Use dot notation for

158

specific keyspace (e.g., "keyspace.table")

159

160

Returns:

161

bool: True if table exists, False otherwise

162

"""

163

164

def record_exists(self, table: str, keys: dict[str, str]) -> bool:

165

"""

166

Check if a record exists in Cassandra based on primary key values.

167

168

Args:

169

table (str): Target table name. Use dot notation for

170

specific keyspace (e.g., "keyspace.table").

171

Input is sanitized to prevent injection attacks.

172

keys (dict[str, str]): Primary key column names and values.

173

Used to construct WHERE clause conditions.

174

175

Returns:

176

bool: True if record exists, False otherwise.

177

Returns False on any query execution errors.

178

179

Note:

180

This method sanitizes input to prevent SQL injection. Table and

181

keyspace names must match ^\w+$ pattern. Query errors are

182

caught and return False rather than raising exceptions.

183

"""

184

185

@staticmethod

186

def get_lb_policy(policy_name: str, policy_args: dict[str, Any]) -> Policy:

187

"""

188

Create load balancing policy for cluster connection.

189

190

Args:

191

policy_name (str): Policy type ("DCAwareRoundRobinPolicy",

192

"WhiteListRoundRobinPolicy", "TokenAwarePolicy",

193

or "RoundRobinPolicy"). Falls back to RoundRobinPolicy

194

for unrecognized policy names.

195

policy_args (dict): Policy-specific configuration parameters:

196

- DCAwareRoundRobinPolicy: local_dc (str), used_hosts_per_remote_dc (int)

197

- WhiteListRoundRobinPolicy: hosts (list) - required

198

- TokenAwarePolicy: child_load_balancing_policy (str),

199

child_load_balancing_policy_args (dict)

200

201

Returns:

202

Policy: Configured load balancing policy instance. Returns RoundRobinPolicy

203

as fallback for invalid configurations.

204

205

Raises:

206

ValueError: When required parameters are missing (e.g., hosts for WhiteListRoundRobinPolicy)

207

"""

208

```

209

210

### Table Existence Sensor

211

212

Monitors Cassandra clusters waiting for specific tables to be created, useful for orchestrating workflows that depend on schema changes.

213

214

```python { .api }

215

class CassandraTableSensor(BaseSensorOperator):

216

"""

217

Sensor that checks for the existence of a table in a Cassandra cluster.

218

219

Inherits standard sensor behavior with poke interval, timeout, and

220

retry capabilities. Useful for waiting on schema migrations or

221

table creation tasks.

222

"""

223

224

template_fields = ("table",)

225

226

def __init__(

227

self,

228

*,

229

table: str,

230

cassandra_conn_id: str = CassandraHook.default_conn_name,

231

**kwargs: Any,

232

) -> None:

233

"""

234

Initialize table sensor.

235

236

Args:

237

table (str): Target table name. Use dot notation for

238

specific keyspace (e.g., "keyspace.table")

239

cassandra_conn_id (str): Airflow connection ID for Cassandra

240

**kwargs: Additional sensor parameters (poke_interval, timeout, etc.)

241

"""

242

243

def poke(self, context: Context) -> bool:

244

"""

245

Check if the specified table exists in Cassandra.

246

247

Args:

248

context (Context): Airflow task execution context

249

250

Returns:

251

bool: True if table exists (sensor succeeds), False to continue waiting

252

"""

253

```

254

255

### Record Existence Sensor

256

257

Monitors Cassandra clusters waiting for specific records to appear, enabling data-driven workflow orchestration and event-based pipeline triggers.

258

259

```python { .api }

260

class CassandraRecordSensor(BaseSensorOperator):

261

"""

262

Sensor that checks for the existence of a record in a Cassandra cluster.

263

264

Monitors for records based on primary key values, supporting complex

265

composite keys. Useful for triggering downstream tasks when specific

266

data becomes available.

267

"""

268

269

template_fields = ("table", "keys")

270

271

def __init__(

272

self,

273

*,

274

keys: dict[str, str],

275

table: str,

276

cassandra_conn_id: str = CassandraHook.default_conn_name,

277

**kwargs: Any,

278

) -> None:

279

"""

280

Initialize record sensor.

281

282

Args:

283

keys (dict[str, str]): Primary key column names and values to monitor.

284

All specified keys must match for record to be found.

285

table (str): Target table name. Use dot notation for

286

specific keyspace (e.g., "keyspace.table")

287

cassandra_conn_id (str): Airflow connection ID for Cassandra

288

**kwargs: Additional sensor parameters (poke_interval, timeout, etc.)

289

"""

290

291

def poke(self, context: Context) -> bool:

292

"""

293

Check if record with specified key values exists in Cassandra.

294

295

Args:

296

context (Context): Airflow task execution context

297

298

Returns:

299

bool: True if record exists (sensor succeeds), False to continue waiting

300

"""

301

```

302

303

## Types

304

305

```python { .api }

306

from cassandra.cluster import Cluster, Session

307

from cassandra.auth import PlainTextAuthProvider

308

from cassandra.policies import (

309

DCAwareRoundRobinPolicy,

310

RoundRobinPolicy,

311

TokenAwarePolicy,

312

WhiteListRoundRobinPolicy

313

)

314

from airflow.utils.context import Context

315

from airflow.utils.log.logging_mixin import LoggingMixin

316

from collections.abc import Sequence

317

from typing import Any, TypeAlias, TYPE_CHECKING

318

319

# Load balancing policy type alias

320

Policy: TypeAlias = (

321

DCAwareRoundRobinPolicy |

322

RoundRobinPolicy |

323

TokenAwarePolicy |

324

WhiteListRoundRobinPolicy

325

)

326

```

327

328

## Version Compatibility

329

330

The package includes version compatibility utilities for different Airflow versions:

331

332

```python { .api }

333

# Version compatibility constants

334

AIRFLOW_V_3_0_PLUS: bool # True if Airflow 3.0+

335

AIRFLOW_V_3_1_PLUS: bool # True if Airflow 3.1+

336

337

# Compatibility imports (automatically selected based on Airflow version)

338

BaseHook # Base class for hooks

339

BaseSensorOperator # Base class for sensors

340

```

341

342

## Usage Examples

343

344

### Waiting for Table Creation

345

```python

346

from airflow.providers.apache.cassandra.sensors.table import CassandraTableSensor

347

348

# Wait for a table to be created before proceeding

349

table_sensor = CassandraTableSensor(

350

task_id="wait_for_user_table",

351

table="production.users", # keyspace.table format

352

cassandra_conn_id="prod_cassandra",

353

poke_interval=30, # Check every 30 seconds

354

timeout=3600, # Timeout after 1 hour

355

)

356

```

357

358

### Monitoring Record Availability

359

```python

360

from airflow.providers.apache.cassandra.sensors.record import CassandraRecordSensor

361

362

# Wait for specific user record to be inserted

363

record_sensor = CassandraRecordSensor(

364

task_id="wait_for_user_data",

365

table="production.users",

366

keys={

367

"user_id": "{{ ds }}", # Template support for dynamic values

368

"region": "us-east-1"

369

},

370

cassandra_conn_id="prod_cassandra",

371

poke_interval=60,

372

timeout=7200,

373

)

374

```

375

376

### Custom Load Balancing

377

```python

378

# Configure connection with custom load balancing policy

379

connection_extra = {

380

"load_balancing_policy": "DCAwareRoundRobinPolicy",

381

"load_balancing_policy_args": {

382

"local_dc": "datacenter1",

383

"used_hosts_per_remote_dc": 2

384

}

385

}

386

387

hook = CassandraHook("cassandra_with_lb_policy")

388

session = hook.get_conn()

389

# Use session for queries

390

hook.shutdown_cluster()

391

```

392

393

### Error Handling

394

```python

395

from airflow.providers.apache.cassandra.hooks.cassandra import CassandraHook

396

397

def safe_cassandra_operation():

398

hook = None

399

try:

400

hook = CassandraHook("cassandra_default")

401

402

if not hook.table_exists("keyspace.required_table"):

403

raise ValueError("Required table does not exist")

404

405

record_exists = hook.record_exists("keyspace.users", {"id": "123"})

406

return record_exists

407

408

except Exception as e:

409

print(f"Cassandra operation failed: {e}")

410

return False

411

finally:

412

if hook:

413

hook.shutdown_cluster()

414

```