or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

async-io.mdauth-policies.mdcluster-session.mdcql-types.mdcqlengine-orm.mdindex.mdmetadata.mdquery-execution.md

auth-policies.mddocs/

0

# Authentication & Policies

1

2

Authentication providers, load balancing policies, retry strategies, and reconnection policies for robust cluster operations. The policy framework provides comprehensive configurability for production deployments.

3

4

## Capabilities

5

6

### Authentication

7

8

Authentication providers for secure cluster connections with various authentication mechanisms.

9

10

```python { .api }

11

class AuthProvider:

12

def new_authenticator(self, host):

13

"""

14

Create a new Authenticator instance for the given host.

15

16

Parameters:

17

- host (Host): The host to authenticate against

18

19

Returns:

20

Authenticator: Authenticator instance for this host

21

"""

22

23

class Authenticator:

24

def initial_response(self):

25

"""

26

Get the initial response for SASL authentication.

27

28

Returns:

29

bytes: Initial authentication response

30

"""

31

32

def evaluate_challenge(self, challenge):

33

"""

34

Evaluate a challenge from the server.

35

36

Parameters:

37

- challenge (bytes): Challenge bytes from server

38

39

Returns:

40

bytes: Response to the challenge

41

"""

42

43

def on_authentication_success(self, token):

44

"""

45

Called when authentication succeeds.

46

47

Parameters:

48

- token (bytes): Success token from server

49

"""

50

51

class PlainTextAuthProvider(AuthProvider):

52

def __init__(self, username, password):

53

"""

54

Authentication provider using username and password.

55

56

Parameters:

57

- username (str): Username for authentication

58

- password (str): Password for authentication

59

"""

60

61

def new_authenticator(self, host):

62

"""

63

Create a PlainTextAuthenticator for the host.

64

65

Parameters:

66

- host (Host): Target host

67

68

Returns:

69

PlainTextAuthenticator: Authenticator instance

70

"""

71

72

class PlainTextAuthenticator(Authenticator):

73

def __init__(self, username, password):

74

"""

75

SASL PLAIN authenticator for username/password authentication.

76

77

Parameters:

78

- username (str): Username for authentication

79

- password (str): Password for authentication

80

"""

81

82

class SaslAuthProvider(AuthProvider):

83

def __init__(self, **sasl_kwargs):

84

"""

85

Generic SASL authentication provider (requires puresasl package).

86

87

Parameters:

88

- sasl_kwargs: Keyword arguments passed to puresasl.client.SASLClient

89

"""

90

91

class SaslAuthenticator(Authenticator):

92

def __init__(self, sasl_kwargs):

93

"""

94

Generic SASL authenticator using puresasl.

95

96

Parameters:

97

- sasl_kwargs (dict): Arguments for SASLClient initialization

98

"""

99

```

100

101

### Load Balancing Policies

102

103

Policies for selecting which hosts to use for query execution and managing host distances.

104

105

```python { .api }

106

class HostDistance:

107

"""Constants for categorizing host distances."""

108

109

LOCAL = 0

110

"""Hosts in the local datacenter"""

111

112

REMOTE = 1

113

"""Hosts in remote datacenters"""

114

115

IGNORED = -1

116

"""Hosts that should be ignored"""

117

118

class LoadBalancingPolicy:

119

def distance(self, host):

120

"""

121

Return the distance designation for a host.

122

123

Parameters:

124

- host (Host): The host to categorize

125

126

Returns:

127

int: HostDistance constant (LOCAL, REMOTE, or IGNORED)

128

"""

129

130

def populate(self, cluster, hosts):

131

"""

132

Initialize the policy with cluster information.

133

134

Parameters:

135

- cluster (Cluster): The cluster instance

136

- hosts (list): List of all known hosts

137

"""

138

139

def make_query_plan(self, working_keyspace=None, query=None):

140

"""

141

Generate a query plan (ordered list of hosts to try).

142

143

Parameters:

144

- working_keyspace (str): Current keyspace

145

- query (Statement): Query being executed

146

147

Returns:

148

list: Ordered list of Host objects to try

149

"""

150

151

def on_up(self, host):

152

"""

153

Called when a host comes online.

154

155

Parameters:

156

- host (Host): Host that came online

157

"""

158

159

def on_down(self, host):

160

"""

161

Called when a host goes offline.

162

163

Parameters:

164

- host (Host): Host that went offline

165

"""

166

167

def on_add(self, host):

168

"""

169

Called when a new host is added to the cluster.

170

171

Parameters:

172

- host (Host): Host that was added

173

"""

174

175

def on_remove(self, host):

176

"""

177

Called when a host is removed from the cluster.

178

179

Parameters:

180

- host (Host): Host that was removed

181

"""

182

183

class RoundRobinPolicy(LoadBalancingPolicy):

184

def __init__(self):

185

"""

186

Simple round-robin load balancing across all hosts.

187

Treats all hosts as LOCAL distance.

188

"""

189

190

class DCAwareRoundRobinPolicy(LoadBalancingPolicy):

191

def __init__(self, local_dc=None, used_hosts_per_remote_dc=0):

192

"""

193

Datacenter-aware round-robin load balancing.

194

195

Parameters:

196

- local_dc (str): Name of the local datacenter

197

- used_hosts_per_remote_dc (int): Number of remote hosts to use per remote DC

198

"""

199

200

class TokenAwarePolicy(LoadBalancingPolicy):

201

def __init__(self, child_policy, shuffle_replicas=True):

202

"""

203

Token-aware load balancing wrapper that routes queries to replica nodes.

204

205

Parameters:

206

- child_policy (LoadBalancingPolicy): Underlying load balancing policy

207

- shuffle_replicas (bool): Whether to shuffle replica order

208

"""

209

210

class WhiteListRoundRobinPolicy(LoadBalancingPolicy):

211

def __init__(self, hosts):

212

"""

213

Round-robin load balancing limited to a whitelist of hosts.

214

215

Parameters:

216

- hosts (list): List of allowed host addresses

217

"""

218

```

219

220

### Retry Policies

221

222

Policies for handling query failures and determining retry behavior.

223

224

```python { .api }

225

class RetryDecision:

226

"""Decision types for retry policy responses."""

227

228

RETHROW = 0

229

"""Re-raise the exception without retrying"""

230

231

RETRY = 1

232

"""Retry the query"""

233

234

IGNORE = 2

235

"""Ignore the error and return an empty result"""

236

237

class RetryPolicy:

238

def on_read_timeout(self, query, consistency_level, required_responses, received_responses, data_retrieved, retry_num):

239

"""

240

Handle read timeout errors.

241

242

Parameters:

243

- query (Statement): The query that timed out

244

- consistency_level (int): Consistency level used

245

- required_responses (int): Number of responses required

246

- received_responses (int): Number of responses received

247

- data_retrieved (bool): Whether data was retrieved before timeout

248

- retry_num (int): Number of retries already attempted

249

250

Returns:

251

tuple: (RetryDecision, ConsistencyLevel or None)

252

"""

253

254

def on_write_timeout(self, query, consistency_level, write_type, required_responses, received_responses, retry_num):

255

"""

256

Handle write timeout errors.

257

258

Parameters:

259

- query (Statement): The query that timed out

260

- consistency_level (int): Consistency level used

261

- write_type (str): Type of write operation

262

- required_responses (int): Number of responses required

263

- received_responses (int): Number of responses received

264

- retry_num (int): Number of retries already attempted

265

266

Returns:

267

tuple: (RetryDecision, ConsistencyLevel or None)

268

"""

269

270

def on_unavailable(self, query, consistency_level, required_replicas, alive_replicas, retry_num):

271

"""

272

Handle unavailable errors.

273

274

Parameters:

275

- query (Statement): The query that failed

276

- consistency_level (int): Consistency level used

277

- required_replicas (int): Number of replicas required

278

- alive_replicas (int): Number of replicas alive

279

- retry_num (int): Number of retries already attempted

280

281

Returns:

282

tuple: (RetryDecision, ConsistencyLevel or None)

283

"""

284

285

def on_request_error(self, query, consistency_level, error, retry_num):

286

"""

287

Handle general request errors.

288

289

Parameters:

290

- query (Statement): The query that failed

291

- consistency_level (int): Consistency level used

292

- error (Exception): The error that occurred

293

- retry_num (int): Number of retries already attempted

294

295

Returns:

296

tuple: (RetryDecision, ConsistencyLevel or None)

297

"""

298

299

class FallthroughRetryPolicy(RetryPolicy):

300

def __init__(self):

301

"""

302

Never retry queries; always re-raise exceptions.

303

This is the default retry policy.

304

"""

305

306

class DowngradingConsistencyRetryPolicy(RetryPolicy):

307

def __init__(self):

308

"""

309

Retry queries with degraded consistency levels on certain failures.

310

311

Behavior:

312

- Read timeouts: Retry once with ONE if data was retrieved

313

- Write timeouts: Retry once with ONE for SIMPLE writes

314

- Unavailable: Retry once with lower consistency level

315

"""

316

```

317

318

### Reconnection Policies

319

320

Policies for managing reconnection delays when hosts become unavailable.

321

322

```python { .api }

323

class ReconnectionPolicy:

324

def new_schedule(self):

325

"""

326

Create a new reconnection schedule.

327

328

Returns:

329

generator: Generator yielding delay times in seconds

330

"""

331

332

class ConstantReconnectionPolicy(ReconnectionPolicy):

333

def __init__(self, delay, max_attempts=None):

334

"""

335

Reconnection policy with constant delay between attempts.

336

337

Parameters:

338

- delay (float): Delay in seconds between reconnection attempts

339

- max_attempts (int): Maximum number of attempts (None for unlimited)

340

"""

341

342

class ExponentialReconnectionPolicy(ReconnectionPolicy):

343

def __init__(self, base_delay, max_delay, max_attempts=None):

344

"""

345

Reconnection policy with exponential backoff.

346

347

Parameters:

348

- base_delay (float): Base delay in seconds for first attempt

349

- max_delay (float): Maximum delay in seconds

350

- max_attempts (int): Maximum number of attempts (None for unlimited)

351

"""

352

```

353

354

### Conviction Policies

355

356

Policies for determining when to mark hosts as failed.

357

358

```python { .api }

359

class ConvictionPolicy:

360

def add_failure(self, host, connection_exc):

361

"""

362

Record a connection failure for a host.

363

364

Parameters:

365

- host (Host): The host that failed

366

- connection_exc (Exception): The connection exception

367

368

Returns:

369

bool: True if the host should be convicted (marked as down)

370

"""

371

372

def reset(self, host):

373

"""

374

Reset failure tracking for a host.

375

376

Parameters:

377

- host (Host): The host to reset

378

"""

379

380

class SimpleConvictionPolicy(ConvictionPolicy):

381

def __init__(self):

382

"""

383

Simple conviction policy that marks hosts down on first failure.

384

"""

385

```

386

387

### Write Types

388

389

Constants for different write operation types used in retry policies.

390

391

```python { .api }

392

class WriteType:

393

"""Constants for write operation types."""

394

395

SIMPLE = 0

396

"""Write to a single partition key (atomic and isolated)"""

397

398

BATCH = 1

399

"""Write to multiple partition keys using distributed batch log (atomic)"""

400

401

UNLOGGED_BATCH = 2

402

"""Write to multiple partition keys without batch log (not atomic)"""

403

404

COUNTER = 3

405

"""Counter write operation (should not be replayed)"""

406

407

BATCH_LOG = 4

408

"""Initial write to distributed batch log (internal Cassandra operation)"""

409

410

CAS = 5

411

"""Compare-and-set (conditional) write operation"""

412

```

413

414

### Host State Listeners

415

416

Listeners for monitoring host state changes in the cluster.

417

418

```python { .api }

419

class HostStateListener:

420

def on_add(self, host):

421

"""

422

Called when a new host is added to the cluster.

423

424

Parameters:

425

- host (Host): The host that was added

426

"""

427

428

def on_up(self, host):

429

"""

430

Called when a host comes back online.

431

432

Parameters:

433

- host (Host): The host that came online

434

"""

435

436

def on_down(self, host):

437

"""

438

Called when a host goes offline.

439

440

Parameters:

441

- host (Host): The host that went offline

442

"""

443

444

def on_remove(self, host):

445

"""

446

Called when a host is removed from the cluster.

447

448

Parameters:

449

- host (Host): The host that was removed

450

"""

451

```

452

453

## Usage Examples

454

455

### Authentication Setup

456

457

```python

458

from cassandra.cluster import Cluster

459

from cassandra.auth import PlainTextAuthProvider, SaslAuthProvider

460

461

# Basic username/password authentication

462

auth_provider = PlainTextAuthProvider(

463

username='cassandra_user',

464

password='secure_password'

465

)

466

467

cluster = Cluster(

468

contact_points=['127.0.0.1'],

469

auth_provider=auth_provider

470

)

471

472

# SASL authentication (requires puresasl package)

473

sasl_auth = SaslAuthProvider(

474

mechanism='GSSAPI',

475

service='cassandra',

476

qops=['auth']

477

)

478

479

cluster_sasl = Cluster(

480

contact_points=['127.0.0.1'],

481

auth_provider=sasl_auth

482

)

483

```

484

485

### Load Balancing Configuration

486

487

```python

488

from cassandra.policies import (

489

DCAwareRoundRobinPolicy,

490

TokenAwarePolicy,

491

WhiteListRoundRobinPolicy,

492

HostDistance

493

)

494

495

# Datacenter-aware load balancing

496

dc_aware_policy = DCAwareRoundRobinPolicy(

497

local_dc='datacenter1',

498

used_hosts_per_remote_dc=2 # Use 2 hosts from each remote DC

499

)

500

501

# Token-aware routing with DC-aware fallback

502

token_aware_policy = TokenAwarePolicy(dc_aware_policy)

503

504

# Whitelist policy for specific hosts only

505

whitelist_policy = WhiteListRoundRobinPolicy([

506

'192.168.1.10',

507

'192.168.1.11',

508

'192.168.1.12'

509

])

510

511

cluster = Cluster(

512

contact_points=['192.168.1.10'],

513

load_balancing_policy=token_aware_policy

514

)

515

516

# Custom load balancing policy

517

class CustomLoadBalancingPolicy(LoadBalancingPolicy):

518

def distance(self, host):

519

# Mark hosts in 192.168.1.x as local, others as remote

520

if host.address.startswith('192.168.1.'):

521

return HostDistance.LOCAL

522

else:

523

return HostDistance.REMOTE

524

525

def make_query_plan(self, working_keyspace=None, query=None):

526

# Custom host selection logic

527

local_hosts = [h for h in self.hosts if self.distance(h) == HostDistance.LOCAL]

528

remote_hosts = [h for h in self.hosts if self.distance(h) == HostDistance.REMOTE]

529

530

# Return local hosts first, then remote

531

return local_hosts + remote_hosts[:2] # Max 2 remote hosts

532

533

custom_policy = CustomLoadBalancingPolicy()

534

cluster = Cluster(

535

contact_points=['192.168.1.10'],

536

load_balancing_policy=custom_policy

537

)

538

```

539

540

### Retry Policy Configuration

541

542

```python

543

from cassandra.policies import (

544

FallthroughRetryPolicy,

545

DowngradingConsistencyRetryPolicy,

546

RetryPolicy,

547

RetryDecision

548

)

549

from cassandra import ConsistencyLevel

550

551

# Use fallthrough policy (no retries)

552

fallthrough_policy = FallthroughRetryPolicy()

553

554

# Use downgrading consistency policy

555

downgrading_policy = DowngradingConsistencyRetryPolicy()

556

557

cluster = Cluster(

558

contact_points=['127.0.0.1'],

559

default_retry_policy=downgrading_policy

560

)

561

562

# Custom retry policy

563

class AggressiveRetryPolicy(RetryPolicy):

564

def on_read_timeout(self, query, consistency_level, required_responses,

565

received_responses, data_retrieved, retry_num):

566

if retry_num < 3: # Retry up to 3 times

567

if data_retrieved:

568

# Retry with ONE if we got some data

569

return (RetryDecision.RETRY, ConsistencyLevel.ONE)

570

elif received_responses > 0:

571

# Retry with lower consistency if we got any response

572

return (RetryDecision.RETRY, ConsistencyLevel.ONE)

573

574

# Give up after 3 retries

575

return (RetryDecision.RETHROW, None)

576

577

def on_write_timeout(self, query, consistency_level, write_type,

578

required_responses, received_responses, retry_num):

579

if retry_num < 2 and write_type == 'SIMPLE':

580

# Retry simple writes once with ONE

581

return (RetryDecision.RETRY, ConsistencyLevel.ONE)

582

583

return (RetryDecision.RETHROW, None)

584

585

aggressive_policy = AggressiveRetryPolicy()

586

587

# Apply retry policy to cluster or individual statements

588

cluster.default_retry_policy = aggressive_policy

589

590

# Or apply to specific statements

591

from cassandra.query import SimpleStatement

592

stmt = SimpleStatement("SELECT * FROM users WHERE id = %s")

593

stmt.retry_policy = aggressive_policy

594

```

595

596

### Reconnection Policy Configuration

597

598

```python

599

from cassandra.policies import (

600

ConstantReconnectionPolicy,

601

ExponentialReconnectionPolicy

602

)

603

604

# Constant delay reconnection

605

constant_policy = ConstantReconnectionPolicy(

606

delay=5.0, # Wait 5 seconds between attempts

607

max_attempts=10 # Try up to 10 times

608

)

609

610

# Exponential backoff reconnection

611

exponential_policy = ExponentialReconnectionPolicy(

612

base_delay=1.0, # Start with 1 second

613

max_delay=60.0, # Cap at 60 seconds

614

max_attempts=None # Retry indefinitely

615

)

616

617

cluster = Cluster(

618

contact_points=['127.0.0.1'],

619

reconnection_policy=exponential_policy

620

)

621

622

# Custom reconnection policy

623

class CustomReconnectionPolicy(ReconnectionPolicy):

624

def __init__(self, delays):

625

self.delays = delays

626

627

def new_schedule(self):

628

# Use custom delay sequence

629

for delay in self.delays:

630

yield delay

631

632

# After custom sequence, use constant 30 second delays

633

while True:

634

yield 30.0

635

636

custom_reconnect = CustomReconnectionPolicy([1, 2, 5, 10, 15, 20])

637

cluster = Cluster(

638

contact_points=['127.0.0.1'],

639

reconnection_policy=custom_reconnect

640

)

641

```

642

643

### Host State Monitoring

644

645

```python

646

from cassandra.policies import HostStateListener

647

648

class MyHostStateListener(HostStateListener):

649

def on_add(self, host):

650

print(f"Host added: {host.address}")

651

652

def on_up(self, host):

653

print(f"Host came online: {host.address}")

654

# Could trigger application-level notifications

655

656

def on_down(self, host):

657

print(f"Host went offline: {host.address}")

658

# Could trigger alerting systems

659

660

def on_remove(self, host):

661

print(f"Host removed: {host.address}")

662

663

# Register the listener

664

cluster = Cluster(contact_points=['127.0.0.1'])

665

cluster.register_listener(MyHostStateListener())

666

session = cluster.connect()

667

668

# The listener will now receive notifications about host state changes

669

```

670

671

### Complete Policy Configuration

672

673

```python

674

from cassandra.cluster import Cluster

675

from cassandra.auth import PlainTextAuthProvider

676

from cassandra.policies import (

677

DCAwareRoundRobinPolicy,

678

TokenAwarePolicy,

679

DowngradingConsistencyRetryPolicy,

680

ExponentialReconnectionPolicy,

681

SimpleConvictionPolicy

682

)

683

684

# Complete production configuration

685

auth_provider = PlainTextAuthProvider(

686

username='app_user',

687

password='secure_password'

688

)

689

690

load_balancing_policy = TokenAwarePolicy(

691

DCAwareRoundRobinPolicy(

692

local_dc='DC1',

693

used_hosts_per_remote_dc=1

694

)

695

)

696

697

retry_policy = DowngradingConsistencyRetryPolicy()

698

699

reconnection_policy = ExponentialReconnectionPolicy(

700

base_delay=1.0,

701

max_delay=60.0

702

)

703

704

conviction_policy = SimpleConvictionPolicy()

705

706

cluster = Cluster(

707

contact_points=['10.0.1.1', '10.0.1.2', '10.0.1.3'],

708

port=9042,

709

auth_provider=auth_provider,

710

load_balancing_policy=load_balancing_policy,

711

default_retry_policy=retry_policy,

712

reconnection_policy=reconnection_policy,

713

conviction_policy=conviction_policy,

714

compression=True,

715

protocol_version=4

716

)

717

718

session = cluster.connect('my_keyspace')

719

```