0
# Authentication & Policies
1
2
Authentication providers, load balancing policies, retry strategies, and reconnection policies for robust cluster operations. The policy framework provides comprehensive configurability for production deployments.
3
4
## Capabilities
5
6
### Authentication
7
8
Authentication providers for secure cluster connections with various authentication mechanisms.
9
10
```python { .api }
11
class AuthProvider:
12
def new_authenticator(self, host):
13
"""
14
Create a new Authenticator instance for the given host.
15
16
Parameters:
17
- host (Host): The host to authenticate against
18
19
Returns:
20
Authenticator: Authenticator instance for this host
21
"""
22
23
class Authenticator:
24
def initial_response(self):
25
"""
26
Get the initial response for SASL authentication.
27
28
Returns:
29
bytes: Initial authentication response
30
"""
31
32
def evaluate_challenge(self, challenge):
33
"""
34
Evaluate a challenge from the server.
35
36
Parameters:
37
- challenge (bytes): Challenge bytes from server
38
39
Returns:
40
bytes: Response to the challenge
41
"""
42
43
def on_authentication_success(self, token):
44
"""
45
Called when authentication succeeds.
46
47
Parameters:
48
- token (bytes): Success token from server
49
"""
50
51
class PlainTextAuthProvider(AuthProvider):
52
def __init__(self, username, password):
53
"""
54
Authentication provider using username and password.
55
56
Parameters:
57
- username (str): Username for authentication
58
- password (str): Password for authentication
59
"""
60
61
def new_authenticator(self, host):
62
"""
63
Create a PlainTextAuthenticator for the host.
64
65
Parameters:
66
- host (Host): Target host
67
68
Returns:
69
PlainTextAuthenticator: Authenticator instance
70
"""
71
72
class PlainTextAuthenticator(Authenticator):
73
def __init__(self, username, password):
74
"""
75
SASL PLAIN authenticator for username/password authentication.
76
77
Parameters:
78
- username (str): Username for authentication
79
- password (str): Password for authentication
80
"""
81
82
class SaslAuthProvider(AuthProvider):
83
def __init__(self, **sasl_kwargs):
84
"""
85
Generic SASL authentication provider (requires puresasl package).
86
87
Parameters:
88
- sasl_kwargs: Keyword arguments passed to puresasl.client.SASLClient
89
"""
90
91
class SaslAuthenticator(Authenticator):
92
def __init__(self, sasl_kwargs):
93
"""
94
Generic SASL authenticator using puresasl.
95
96
Parameters:
97
- sasl_kwargs (dict): Arguments for SASLClient initialization
98
"""
99
```
100
101
### Load Balancing Policies
102
103
Policies for selecting which hosts to use for query execution and managing host distances.
104
105
```python { .api }
106
class HostDistance:
107
"""Constants for categorizing host distances."""
108
109
LOCAL = 0
110
"""Hosts in the local datacenter"""
111
112
REMOTE = 1
113
"""Hosts in remote datacenters"""
114
115
IGNORED = -1
116
"""Hosts that should be ignored"""
117
118
class LoadBalancingPolicy:
119
def distance(self, host):
120
"""
121
Return the distance designation for a host.
122
123
Parameters:
124
- host (Host): The host to categorize
125
126
Returns:
127
int: HostDistance constant (LOCAL, REMOTE, or IGNORED)
128
"""
129
130
def populate(self, cluster, hosts):
131
"""
132
Initialize the policy with cluster information.
133
134
Parameters:
135
- cluster (Cluster): The cluster instance
136
- hosts (list): List of all known hosts
137
"""
138
139
def make_query_plan(self, working_keyspace=None, query=None):
140
"""
141
Generate a query plan (ordered list of hosts to try).
142
143
Parameters:
144
- working_keyspace (str): Current keyspace
145
- query (Statement): Query being executed
146
147
Returns:
148
list: Ordered list of Host objects to try
149
"""
150
151
def on_up(self, host):
152
"""
153
Called when a host comes online.
154
155
Parameters:
156
- host (Host): Host that came online
157
"""
158
159
def on_down(self, host):
160
"""
161
Called when a host goes offline.
162
163
Parameters:
164
- host (Host): Host that went offline
165
"""
166
167
def on_add(self, host):
168
"""
169
Called when a new host is added to the cluster.
170
171
Parameters:
172
- host (Host): Host that was added
173
"""
174
175
def on_remove(self, host):
176
"""
177
Called when a host is removed from the cluster.
178
179
Parameters:
180
- host (Host): Host that was removed
181
"""
182
183
class RoundRobinPolicy(LoadBalancingPolicy):
184
def __init__(self):
185
"""
186
Simple round-robin load balancing across all hosts.
187
Treats all hosts as LOCAL distance.
188
"""
189
190
class DCAwareRoundRobinPolicy(LoadBalancingPolicy):
191
def __init__(self, local_dc=None, used_hosts_per_remote_dc=0):
192
"""
193
Datacenter-aware round-robin load balancing.
194
195
Parameters:
196
- local_dc (str): Name of the local datacenter
197
- used_hosts_per_remote_dc (int): Number of remote hosts to use per remote DC
198
"""
199
200
class TokenAwarePolicy(LoadBalancingPolicy):
201
def __init__(self, child_policy, shuffle_replicas=True):
202
"""
203
Token-aware load balancing wrapper that routes queries to replica nodes.
204
205
Parameters:
206
- child_policy (LoadBalancingPolicy): Underlying load balancing policy
207
- shuffle_replicas (bool): Whether to shuffle replica order
208
"""
209
210
class WhiteListRoundRobinPolicy(LoadBalancingPolicy):
211
def __init__(self, hosts):
212
"""
213
Round-robin load balancing limited to a whitelist of hosts.
214
215
Parameters:
216
- hosts (list): List of allowed host addresses
217
"""
218
```
219
220
### Retry Policies
221
222
Policies for handling query failures and determining retry behavior.
223
224
```python { .api }
225
class RetryDecision:
226
"""Decision types for retry policy responses."""
227
228
RETHROW = 0
229
"""Re-raise the exception without retrying"""
230
231
RETRY = 1
232
"""Retry the query"""
233
234
IGNORE = 2
235
"""Ignore the error and return an empty result"""
236
237
class RetryPolicy:
238
def on_read_timeout(self, query, consistency_level, required_responses, received_responses, data_retrieved, retry_num):
239
"""
240
Handle read timeout errors.
241
242
Parameters:
243
- query (Statement): The query that timed out
244
- consistency_level (int): Consistency level used
245
- required_responses (int): Number of responses required
246
- received_responses (int): Number of responses received
247
- data_retrieved (bool): Whether data was retrieved before timeout
248
- retry_num (int): Number of retries already attempted
249
250
Returns:
251
tuple: (RetryDecision, ConsistencyLevel or None)
252
"""
253
254
def on_write_timeout(self, query, consistency_level, write_type, required_responses, received_responses, retry_num):
255
"""
256
Handle write timeout errors.
257
258
Parameters:
259
- query (Statement): The query that timed out
260
- consistency_level (int): Consistency level used
261
- write_type (str): Type of write operation
262
- required_responses (int): Number of responses required
263
- received_responses (int): Number of responses received
264
- retry_num (int): Number of retries already attempted
265
266
Returns:
267
tuple: (RetryDecision, ConsistencyLevel or None)
268
"""
269
270
def on_unavailable(self, query, consistency_level, required_replicas, alive_replicas, retry_num):
271
"""
272
Handle unavailable errors.
273
274
Parameters:
275
- query (Statement): The query that failed
276
- consistency_level (int): Consistency level used
277
- required_replicas (int): Number of replicas required
278
- alive_replicas (int): Number of replicas alive
279
- retry_num (int): Number of retries already attempted
280
281
Returns:
282
tuple: (RetryDecision, ConsistencyLevel or None)
283
"""
284
285
def on_request_error(self, query, consistency_level, error, retry_num):
286
"""
287
Handle general request errors.
288
289
Parameters:
290
- query (Statement): The query that failed
291
- consistency_level (int): Consistency level used
292
- error (Exception): The error that occurred
293
- retry_num (int): Number of retries already attempted
294
295
Returns:
296
tuple: (RetryDecision, ConsistencyLevel or None)
297
"""
298
299
class FallthroughRetryPolicy(RetryPolicy):
300
def __init__(self):
301
"""
302
Never retry queries; always re-raise exceptions.
303
This is the default retry policy.
304
"""
305
306
class DowngradingConsistencyRetryPolicy(RetryPolicy):
307
def __init__(self):
308
"""
309
Retry queries with degraded consistency levels on certain failures.
310
311
Behavior:
312
- Read timeouts: Retry once with ONE if data was retrieved
313
- Write timeouts: Retry once with ONE for SIMPLE writes
314
- Unavailable: Retry once with lower consistency level
315
"""
316
```
317
318
### Reconnection Policies
319
320
Policies for managing reconnection delays when hosts become unavailable.
321
322
```python { .api }
323
class ReconnectionPolicy:
324
def new_schedule(self):
325
"""
326
Create a new reconnection schedule.
327
328
Returns:
329
generator: Generator yielding delay times in seconds
330
"""
331
332
class ConstantReconnectionPolicy(ReconnectionPolicy):
333
def __init__(self, delay, max_attempts=None):
334
"""
335
Reconnection policy with constant delay between attempts.
336
337
Parameters:
338
- delay (float): Delay in seconds between reconnection attempts
339
- max_attempts (int): Maximum number of attempts (None for unlimited)
340
"""
341
342
class ExponentialReconnectionPolicy(ReconnectionPolicy):
343
def __init__(self, base_delay, max_delay, max_attempts=None):
344
"""
345
Reconnection policy with exponential backoff.
346
347
Parameters:
348
- base_delay (float): Base delay in seconds for first attempt
349
- max_delay (float): Maximum delay in seconds
350
- max_attempts (int): Maximum number of attempts (None for unlimited)
351
"""
352
```
353
354
### Conviction Policies
355
356
Policies for determining when to mark hosts as failed.
357
358
```python { .api }
359
class ConvictionPolicy:
360
def add_failure(self, host, connection_exc):
361
"""
362
Record a connection failure for a host.
363
364
Parameters:
365
- host (Host): The host that failed
366
- connection_exc (Exception): The connection exception
367
368
Returns:
369
bool: True if the host should be convicted (marked as down)
370
"""
371
372
def reset(self, host):
373
"""
374
Reset failure tracking for a host.
375
376
Parameters:
377
- host (Host): The host to reset
378
"""
379
380
class SimpleConvictionPolicy(ConvictionPolicy):
381
def __init__(self):
382
"""
383
Simple conviction policy that marks hosts down on first failure.
384
"""
385
```
386
387
### Write Types
388
389
Constants for different write operation types used in retry policies.
390
391
```python { .api }
392
class WriteType:
393
"""Constants for write operation types."""
394
395
SIMPLE = 0
396
"""Write to a single partition key (atomic and isolated)"""
397
398
BATCH = 1
399
"""Write to multiple partition keys using distributed batch log (atomic)"""
400
401
UNLOGGED_BATCH = 2
402
"""Write to multiple partition keys without batch log (not atomic)"""
403
404
COUNTER = 3
405
"""Counter write operation (should not be replayed)"""
406
407
BATCH_LOG = 4
408
"""Initial write to distributed batch log (internal Cassandra operation)"""
409
410
CAS = 5
411
"""Compare-and-set (conditional) write operation"""
412
```
413
414
### Host State Listeners
415
416
Listeners for monitoring host state changes in the cluster.
417
418
```python { .api }
419
class HostStateListener:
420
def on_add(self, host):
421
"""
422
Called when a new host is added to the cluster.
423
424
Parameters:
425
- host (Host): The host that was added
426
"""
427
428
def on_up(self, host):
429
"""
430
Called when a host comes back online.
431
432
Parameters:
433
- host (Host): The host that came online
434
"""
435
436
def on_down(self, host):
437
"""
438
Called when a host goes offline.
439
440
Parameters:
441
- host (Host): The host that went offline
442
"""
443
444
def on_remove(self, host):
445
"""
446
Called when a host is removed from the cluster.
447
448
Parameters:
449
- host (Host): The host that was removed
450
"""
451
```
452
453
## Usage Examples
454
455
### Authentication Setup
456
457
```python
458
from cassandra.cluster import Cluster
459
from cassandra.auth import PlainTextAuthProvider, SaslAuthProvider
460
461
# Basic username/password authentication
462
auth_provider = PlainTextAuthProvider(
463
username='cassandra_user',
464
password='secure_password'
465
)
466
467
cluster = Cluster(
468
contact_points=['127.0.0.1'],
469
auth_provider=auth_provider
470
)
471
472
# SASL authentication (requires puresasl package)
473
sasl_auth = SaslAuthProvider(
474
mechanism='GSSAPI',
475
service='cassandra',
476
qops=['auth']
477
)
478
479
cluster_sasl = Cluster(
480
contact_points=['127.0.0.1'],
481
auth_provider=sasl_auth
482
)
483
```
484
485
### Load Balancing Configuration
486
487
```python
488
from cassandra.policies import (
489
DCAwareRoundRobinPolicy,
490
TokenAwarePolicy,
491
WhiteListRoundRobinPolicy,
492
HostDistance
493
)
494
495
# Datacenter-aware load balancing
496
dc_aware_policy = DCAwareRoundRobinPolicy(
497
local_dc='datacenter1',
498
used_hosts_per_remote_dc=2 # Use 2 hosts from each remote DC
499
)
500
501
# Token-aware routing with DC-aware fallback
502
token_aware_policy = TokenAwarePolicy(dc_aware_policy)
503
504
# Whitelist policy for specific hosts only
505
whitelist_policy = WhiteListRoundRobinPolicy([
506
'192.168.1.10',
507
'192.168.1.11',
508
'192.168.1.12'
509
])
510
511
cluster = Cluster(
512
contact_points=['192.168.1.10'],
513
load_balancing_policy=token_aware_policy
514
)
515
516
# Custom load balancing policy
517
class CustomLoadBalancingPolicy(LoadBalancingPolicy):
518
def distance(self, host):
519
# Mark hosts in 192.168.1.x as local, others as remote
520
if host.address.startswith('192.168.1.'):
521
return HostDistance.LOCAL
522
else:
523
return HostDistance.REMOTE
524
525
def make_query_plan(self, working_keyspace=None, query=None):
526
# Custom host selection logic
527
local_hosts = [h for h in self.hosts if self.distance(h) == HostDistance.LOCAL]
528
remote_hosts = [h for h in self.hosts if self.distance(h) == HostDistance.REMOTE]
529
530
# Return local hosts first, then remote
531
return local_hosts + remote_hosts[:2] # Max 2 remote hosts
532
533
custom_policy = CustomLoadBalancingPolicy()
534
cluster = Cluster(
535
contact_points=['192.168.1.10'],
536
load_balancing_policy=custom_policy
537
)
538
```
539
540
### Retry Policy Configuration
541
542
```python
543
from cassandra.policies import (
544
FallthroughRetryPolicy,
545
DowngradingConsistencyRetryPolicy,
546
RetryPolicy,
547
RetryDecision
548
)
549
from cassandra import ConsistencyLevel
550
551
# Use fallthrough policy (no retries)
552
fallthrough_policy = FallthroughRetryPolicy()
553
554
# Use downgrading consistency policy
555
downgrading_policy = DowngradingConsistencyRetryPolicy()
556
557
cluster = Cluster(
558
contact_points=['127.0.0.1'],
559
default_retry_policy=downgrading_policy
560
)
561
562
# Custom retry policy
563
class AggressiveRetryPolicy(RetryPolicy):
564
def on_read_timeout(self, query, consistency_level, required_responses,
565
received_responses, data_retrieved, retry_num):
566
if retry_num < 3: # Retry up to 3 times
567
if data_retrieved:
568
# Retry with ONE if we got some data
569
return (RetryDecision.RETRY, ConsistencyLevel.ONE)
570
elif received_responses > 0:
571
# Retry with lower consistency if we got any response
572
return (RetryDecision.RETRY, ConsistencyLevel.ONE)
573
574
# Give up after 3 retries
575
return (RetryDecision.RETHROW, None)
576
577
def on_write_timeout(self, query, consistency_level, write_type,
578
required_responses, received_responses, retry_num):
579
if retry_num < 2 and write_type == 'SIMPLE':
580
# Retry simple writes once with ONE
581
return (RetryDecision.RETRY, ConsistencyLevel.ONE)
582
583
return (RetryDecision.RETHROW, None)
584
585
aggressive_policy = AggressiveRetryPolicy()
586
587
# Apply retry policy to cluster or individual statements
588
cluster.default_retry_policy = aggressive_policy
589
590
# Or apply to specific statements
591
from cassandra.query import SimpleStatement
592
stmt = SimpleStatement("SELECT * FROM users WHERE id = %s")
593
stmt.retry_policy = aggressive_policy
594
```
595
596
### Reconnection Policy Configuration
597
598
```python
599
from cassandra.policies import (
600
ConstantReconnectionPolicy,
601
ExponentialReconnectionPolicy
602
)
603
604
# Constant delay reconnection
605
constant_policy = ConstantReconnectionPolicy(
606
delay=5.0, # Wait 5 seconds between attempts
607
max_attempts=10 # Try up to 10 times
608
)
609
610
# Exponential backoff reconnection
611
exponential_policy = ExponentialReconnectionPolicy(
612
base_delay=1.0, # Start with 1 second
613
max_delay=60.0, # Cap at 60 seconds
614
max_attempts=None # Retry indefinitely
615
)
616
617
cluster = Cluster(
618
contact_points=['127.0.0.1'],
619
reconnection_policy=exponential_policy
620
)
621
622
# Custom reconnection policy
623
class CustomReconnectionPolicy(ReconnectionPolicy):
624
def __init__(self, delays):
625
self.delays = delays
626
627
def new_schedule(self):
628
# Use custom delay sequence
629
for delay in self.delays:
630
yield delay
631
632
# After custom sequence, use constant 30 second delays
633
while True:
634
yield 30.0
635
636
custom_reconnect = CustomReconnectionPolicy([1, 2, 5, 10, 15, 20])
637
cluster = Cluster(
638
contact_points=['127.0.0.1'],
639
reconnection_policy=custom_reconnect
640
)
641
```
642
643
### Host State Monitoring
644
645
```python
646
from cassandra.policies import HostStateListener
647
648
class MyHostStateListener(HostStateListener):
649
def on_add(self, host):
650
print(f"Host added: {host.address}")
651
652
def on_up(self, host):
653
print(f"Host came online: {host.address}")
654
# Could trigger application-level notifications
655
656
def on_down(self, host):
657
print(f"Host went offline: {host.address}")
658
# Could trigger alerting systems
659
660
def on_remove(self, host):
661
print(f"Host removed: {host.address}")
662
663
# Register the listener
664
cluster = Cluster(contact_points=['127.0.0.1'])
665
cluster.register_listener(MyHostStateListener())
666
session = cluster.connect()
667
668
# The listener will now receive notifications about host state changes
669
```
670
671
### Complete Policy Configuration
672
673
```python
674
from cassandra.cluster import Cluster
675
from cassandra.auth import PlainTextAuthProvider
676
from cassandra.policies import (
677
DCAwareRoundRobinPolicy,
678
TokenAwarePolicy,
679
DowngradingConsistencyRetryPolicy,
680
ExponentialReconnectionPolicy,
681
SimpleConvictionPolicy
682
)
683
684
# Complete production configuration
685
auth_provider = PlainTextAuthProvider(
686
username='app_user',
687
password='secure_password'
688
)
689
690
load_balancing_policy = TokenAwarePolicy(
691
DCAwareRoundRobinPolicy(
692
local_dc='DC1',
693
used_hosts_per_remote_dc=1
694
)
695
)
696
697
retry_policy = DowngradingConsistencyRetryPolicy()
698
699
reconnection_policy = ExponentialReconnectionPolicy(
700
base_delay=1.0,
701
max_delay=60.0
702
)
703
704
conviction_policy = SimpleConvictionPolicy()
705
706
cluster = Cluster(
707
contact_points=['10.0.1.1', '10.0.1.2', '10.0.1.3'],
708
port=9042,
709
auth_provider=auth_provider,
710
load_balancing_policy=load_balancing_policy,
711
default_retry_policy=retry_policy,
712
reconnection_policy=reconnection_policy,
713
conviction_policy=conviction_policy,
714
compression=True,
715
protocol_version=4
716
)
717
718
session = cluster.connect('my_keyspace')
719
```