0
# Core Connectivity
1
2
Core cluster connection management, session handling, and connection pooling functionality. The Cluster and Session classes form the foundation of all cassandra-driver operations.
3
4
## Capabilities
5
6
### Cluster Management
7
8
The Cluster class manages connections to multiple Cassandra nodes, handles topology changes, and provides load balancing across the cluster.
9
10
```python { .api }
11
class Cluster:
12
def __init__(
13
self,
14
contact_points=None,
15
port=9042,
16
executor_threads=2,
17
auth_provider=None,
18
load_balancing_policy=None,
19
reconnection_policy=None,
20
default_retry_policy=None,
21
conviction_policy=None,
22
metrics_enabled=False,
23
connection_class=None,
24
ssl_context=None,
25
ssl_options=None,
26
sockopts=None,
27
cql_version=None,
28
protocol_version=None,
29
is_default_protocol_version=None,
30
compression=True,
31
max_schema_agreement_wait=10,
32
control_connection_timeout=2.0,
33
idle_heartbeat_interval=30,
34
idle_heartbeat_timeout=60,
35
schema_event_refresh_window=2,
36
topology_event_refresh_window=10,
37
status_event_refresh_window=2,
38
prepare_on_all_hosts=True,
39
reprepare_on_up=True,
40
max_requests_per_connection=None,
41
**kwargs
42
):
43
"""
44
Initialize a Cluster instance to manage Cassandra connections.
45
46
Parameters:
47
- contact_points (list): List of host addresses to initially connect to
48
- port (int): Port number for Cassandra connections (default: 9042)
49
- executor_threads (int): Number of threads for I/O operations
50
- auth_provider (AuthProvider): Authentication provider for credentials
51
- load_balancing_policy (LoadBalancingPolicy): Policy for host selection
52
- reconnection_policy (ReconnectionPolicy): Policy for reconnection delays
53
- default_retry_policy (RetryPolicy): Default retry policy for failed operations
54
- conviction_policy (ConvictionPolicy): Policy for marking hosts as failed
55
- metrics_enabled (bool): Enable connection and request metrics
56
- connection_class: Connection implementation class
57
- ssl_context: SSL context for encrypted connections
58
- ssl_options (dict): SSL configuration options
59
- sockopts (list): Socket options tuples
60
- cql_version (str): CQL version to use
61
- protocol_version (int): Native protocol version (1-4)
62
- compression (bool or str): Enable compression ('snappy', 'lz4', or True)
63
- max_schema_agreement_wait (float): Max time to wait for schema agreement
64
- control_connection_timeout (float): Timeout for control connection operations
65
- idle_heartbeat_interval (float): Interval between heartbeat messages
66
- idle_heartbeat_timeout (float): Timeout for heartbeat responses
67
- prepare_on_all_hosts (bool): Prepare statements on all hosts
68
- reprepare_on_up (bool): Re-prepare statements when hosts come back up
69
- max_requests_per_connection (int): Max concurrent requests per connection
70
"""
71
72
def connect(self, keyspace=None):
73
"""
74
Create a new Session for this cluster.
75
76
Parameters:
77
- keyspace (str): Default keyspace for the session
78
79
Returns:
80
Session: A new session connected to the cluster
81
"""
82
83
def shutdown(self):
84
"""
85
Shut down this cluster and all associated sessions.
86
Closes all connections and stops background threads.
87
"""
88
89
def add_host(self, address, datacenter=None, rack=None, signal=True):
90
"""
91
Add a host to the cluster.
92
93
Parameters:
94
- address (str): Host address to add
95
- datacenter (str): Datacenter name
96
- rack (str): Rack name
97
- signal (bool): Whether to signal policy changes
98
"""
99
100
def remove_host(self, host):
101
"""
102
Remove a host from the cluster.
103
104
Parameters:
105
- host (Host): Host object to remove
106
"""
107
108
@property
109
def metadata(self):
110
"""Metadata: Cluster metadata including keyspace and table information"""
111
112
@property
113
def metrics(self):
114
"""Metrics: Connection and request metrics if enabled"""
115
```
116
117
### Session Operations
118
119
The Session class executes queries and manages prepared statements within a keyspace context.
120
121
```python { .api }
122
class Session:
123
def execute(self, query, parameters=None, timeout=None, trace=False):
124
"""
125
Execute a query synchronously.
126
127
Parameters:
128
- query (str or Statement): CQL query string or Statement object
129
- parameters (list or dict): Query parameters for placeholder binding
130
- timeout (float): Query timeout in seconds
131
- trace (bool): Enable query tracing
132
133
Returns:
134
ResultSet: Query results with row data and metadata
135
136
Raises:
137
- Unavailable: Not enough replicas available
138
- ReadTimeout: Read operation timed out
139
- WriteTimeout: Write operation timed out
140
- InvalidRequest: Invalid query or parameters
141
"""
142
143
def execute_async(self, query, parameters=None, trace=False):
144
"""
145
Execute a query asynchronously.
146
147
Parameters:
148
- query (str or Statement): CQL query string or Statement object
149
- parameters (list or dict): Query parameters for placeholder binding
150
- trace (bool): Enable query tracing
151
152
Returns:
153
ResponseFuture: Future object for asynchronous result handling
154
"""
155
156
def prepare(self, query):
157
"""
158
Prepare a query for efficient repeated execution.
159
160
Parameters:
161
- query (str): CQL query string with parameter placeholders
162
163
Returns:
164
PreparedStatement: Prepared statement object for binding and execution
165
"""
166
167
def shutdown(self):
168
"""
169
Shut down this session and close all connections.
170
"""
171
172
def set_keyspace(self, keyspace):
173
"""
174
Set the default keyspace for this session.
175
176
Parameters:
177
- keyspace (str): Keyspace name to use as default
178
"""
179
180
@property
181
def keyspace(self):
182
"""str: Current default keyspace for this session"""
183
184
@property
185
def cluster(self):
186
"""Cluster: The cluster this session is connected to"""
187
188
@property
189
def hosts(self):
190
"""set: Set of Host objects in the cluster"""
191
192
@property
193
def user_type_deserializers(self):
194
"""dict: User-defined type deserializers"""
195
196
@property
197
def encoder(self):
198
"""Encoder: Parameter encoder for this session"""
199
200
@property
201
def row_factory(self):
202
"""callable: Factory function for creating row objects from results"""
203
204
@row_factory.setter
205
def row_factory(self, factory):
206
"""Set the row factory for result processing"""
207
208
@property
209
def default_timeout(self):
210
"""float: Default timeout for queries executed by this session"""
211
212
@default_timeout.setter
213
def default_timeout(self, timeout):
214
"""Set the default timeout for queries"""
215
216
@property
217
def default_consistency_level(self):
218
"""int: Default consistency level for queries"""
219
220
@default_consistency_level.setter
221
def default_consistency_level(self, consistency_level):
222
"""Set the default consistency level"""
223
224
@property
225
def default_serial_consistency_level(self):
226
"""int: Default serial consistency level for conditional queries"""
227
228
@default_serial_consistency_level.setter
229
def default_serial_consistency_level(self, serial_consistency_level):
230
"""Set the default serial consistency level"""
231
```
232
233
### Response Handling
234
235
Asynchronous response objects for non-blocking query execution.
236
237
```python { .api }
238
class ResponseFuture:
239
def result(self, timeout=None):
240
"""
241
Block until the query completes and return the result.
242
243
Parameters:
244
- timeout (float): Maximum time to wait for result
245
246
Returns:
247
ResultSet: Query results
248
249
Raises:
250
- Timeout: Operation timed out
251
- Various query-specific exceptions
252
"""
253
254
def get_query_trace(self, max_wait=2.0):
255
"""
256
Get the query trace if tracing was enabled.
257
258
Parameters:
259
- max_wait (float): Maximum time to wait for trace data
260
261
Returns:
262
QueryTrace: Trace information for the executed query
263
"""
264
265
def add_callback(self, fn, *args, **kwargs):
266
"""
267
Add a callback function to be called when query completes successfully.
268
269
Parameters:
270
- fn (callable): Function to call with (result, *args, **kwargs)
271
- args: Additional positional arguments for callback
272
- kwargs: Additional keyword arguments for callback
273
"""
274
275
def add_errback(self, fn, *args, **kwargs):
276
"""
277
Add an error callback function to be called if query fails.
278
279
Parameters:
280
- fn (callable): Function to call with (exception, *args, **kwargs)
281
- args: Additional positional arguments for callback
282
- kwargs: Additional keyword arguments for callback
283
"""
284
285
def add_callbacks(self, callback, errback, callback_args=(), callback_kwargs=None, errback_args=(), errback_kwargs=None):
286
"""
287
Add both success and error callbacks.
288
289
Parameters:
290
- callback (callable): Success callback function
291
- errback (callable): Error callback function
292
- callback_args (tuple): Arguments for success callback
293
- callback_kwargs (dict): Keyword arguments for success callback
294
- errback_args (tuple): Arguments for error callback
295
- errback_kwargs (dict): Keyword arguments for error callback
296
"""
297
298
@property
299
def query(self):
300
"""str or Statement: The query that was executed"""
301
302
@property
303
def session(self):
304
"""Session: The session used to execute the query"""
305
306
@property
307
def has_more_pages(self):
308
"""bool: Whether there are more pages of results available"""
309
```
310
311
### Result Processing
312
313
Result sets and paging support for handling large query results.
314
315
```python { .api }
316
class ResultSet:
317
def __iter__(self):
318
"""Iterate over result rows"""
319
320
def __len__(self):
321
"""Get the number of rows in current page"""
322
323
def __getitem__(self, index):
324
"""Get a row by index"""
325
326
@property
327
def current_rows(self):
328
"""list: Rows in the current page"""
329
330
@property
331
def has_more_pages(self):
332
"""bool: Whether there are more pages available"""
333
334
@property
335
def response_future(self):
336
"""ResponseFuture: Future object used to fetch this result"""
337
338
@property
339
def column_names(self):
340
"""list: Names of columns in the result set"""
341
342
@property
343
def column_types(self):
344
"""list: CQL types of columns in the result set"""
345
346
class PagedResult:
347
def __init__(self, future, session):
348
"""
349
Initialize a paged result iterator.
350
351
Parameters:
352
- future (ResponseFuture): Initial response future
353
- session (Session): Session for fetching additional pages
354
"""
355
356
def __iter__(self):
357
"""Iterate over all rows across all pages"""
358
```
359
360
### Connection Pool Management
361
362
Host and connection pool management for optimal performance.
363
364
```python { .api }
365
class Host:
366
def __init__(self, address, datacenter=None, rack=None):
367
"""
368
Represent a Cassandra host in the cluster.
369
370
Parameters:
371
- address (str): Host IP address or hostname
372
- datacenter (str): Datacenter name
373
- rack (str): Rack name
374
"""
375
376
@property
377
def address(self):
378
"""str: Host address"""
379
380
@property
381
def datacenter(self):
382
"""str: Datacenter name"""
383
384
@property
385
def rack(self):
386
"""str: Rack name"""
387
388
@property
389
def is_up(self):
390
"""bool: Whether the host is currently up"""
391
392
@property
393
def release_version(self):
394
"""str: Cassandra release version on this host"""
395
396
class HostConnectionPool:
397
def __init__(self, host, host_distance, session):
398
"""
399
Manage connections to a specific host.
400
401
Parameters:
402
- host (Host): Target host
403
- host_distance (int): Distance category for this host
404
- session (Session): Parent session
405
"""
406
407
def borrow_connection(self, timeout):
408
"""
409
Borrow a connection from the pool.
410
411
Parameters:
412
- timeout (float): Maximum time to wait for connection
413
414
Returns:
415
Connection: Available connection object
416
"""
417
418
def return_connection(self, connection):
419
"""
420
Return a connection to the pool.
421
422
Parameters:
423
- connection (Connection): Connection to return
424
"""
425
426
@property
427
def host(self):
428
"""Host: The host this pool connects to"""
429
430
@property
431
def is_shutdown(self):
432
"""bool: Whether this pool has been shut down"""
433
434
@property
435
def open_count(self):
436
"""int: Number of open connections in the pool"""
437
```
438
439
### Exception Classes
440
441
Specific exceptions for cluster and session operations.
442
443
```python { .api }
444
class NoHostAvailable(Exception):
445
"""No hosts are available for connection."""
446
447
def __init__(self, message, errors):
448
"""
449
Parameters:
450
- message (str): Error message
451
- errors (dict): Dict mapping Host objects to their connection errors
452
"""
453
454
@property
455
def errors(self):
456
"""dict: Connection errors by host"""
457
458
class QueryExhausted(Exception):
459
"""All query retries have been exhausted."""
460
461
def __init__(self, message, last_host):
462
"""
463
Parameters:
464
- message (str): Error message
465
- last_host (Host): Last host that was tried
466
"""
467
468
class UserTypeDoesNotExist(Exception):
469
"""Referenced user-defined type does not exist."""
470
471
def __init__(self, keyspace, user_type):
472
"""
473
Parameters:
474
- keyspace (str): Keyspace name
475
- user_type (str): User-defined type name
476
"""
477
```
478
479
## Usage Examples
480
481
### Basic Cluster Setup
482
483
```python
484
from cassandra.cluster import Cluster
485
from cassandra.auth import PlainTextAuthProvider
486
from cassandra.policies import DCAwareRoundRobinPolicy
487
488
# Setup authentication
489
auth_provider = PlainTextAuthProvider(username='cassandra', password='cassandra')
490
491
# Setup load balancing
492
load_balancing_policy = DCAwareRoundRobinPolicy(local_dc='datacenter1')
493
494
# Create cluster with custom configuration
495
cluster = Cluster(
496
contact_points=['127.0.0.1', '127.0.0.2'],
497
port=9042,
498
auth_provider=auth_provider,
499
load_balancing_policy=load_balancing_policy,
500
protocol_version=4
501
)
502
503
# Connect and execute queries
504
session = cluster.connect()
505
session.set_keyspace('my_keyspace')
506
507
# Execute synchronous query
508
result = session.execute("SELECT * FROM users WHERE id = %s", [user_id])
509
for row in result:
510
print(f"User: {row.name}")
511
512
# Execute asynchronous query
513
future = session.execute_async("SELECT * FROM users")
514
result = future.result()
515
516
# Clean up
517
cluster.shutdown()
518
```
519
520
### Prepared Statement Usage
521
522
```python
523
# Prepare a statement for repeated execution
524
insert_stmt = session.prepare("""
525
INSERT INTO users (id, name, email, created_at)
526
VALUES (?, ?, ?, ?)
527
""")
528
529
# Execute with different parameters
530
import uuid
531
from datetime import datetime
532
533
session.execute(insert_stmt, [
534
uuid.uuid4(),
535
'Alice Smith',
536
'alice@example.com',
537
datetime.now()
538
])
539
540
session.execute(insert_stmt, [
541
uuid.uuid4(),
542
'Bob Jones',
543
'bob@example.com',
544
datetime.now()
545
])
546
```
547
548
### Asynchronous Operations
549
550
```python
551
def handle_success(result):
552
print(f"Query succeeded with {len(result)} rows")
553
for row in result:
554
print(f"User: {row.name}")
555
556
def handle_error(exception):
557
print(f"Query failed: {exception}")
558
559
# Execute with callbacks
560
future = session.execute_async("SELECT * FROM users")
561
future.add_callback(handle_success)
562
future.add_errback(handle_error)
563
564
# Or wait for result
565
try:
566
result = future.result(timeout=10.0)
567
print(f"Got {len(result)} rows")
568
except Exception as e:
569
print(f"Query failed: {e}")
570
```