or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

async-io.mdauth-policies.mdcluster-session.mdcql-types.mdcqlengine-orm.mdindex.mdmetadata.mdquery-execution.md

cluster-session.mddocs/

0

# Core Connectivity

1

2

Core cluster connection management, session handling, and connection pooling functionality. The Cluster and Session classes form the foundation of all cassandra-driver operations.

3

4

## Capabilities

5

6

### Cluster Management

7

8

The Cluster class manages connections to multiple Cassandra nodes, handles topology changes, and provides load balancing across the cluster.

9

10

```python { .api }

11

class Cluster:

12

def __init__(

13

self,

14

contact_points=None,

15

port=9042,

16

executor_threads=2,

17

auth_provider=None,

18

load_balancing_policy=None,

19

reconnection_policy=None,

20

default_retry_policy=None,

21

conviction_policy=None,

22

metrics_enabled=False,

23

connection_class=None,

24

ssl_context=None,

25

ssl_options=None,

26

sockopts=None,

27

cql_version=None,

28

protocol_version=None,

29

is_default_protocol_version=None,

30

compression=True,

31

max_schema_agreement_wait=10,

32

control_connection_timeout=2.0,

33

idle_heartbeat_interval=30,

34

idle_heartbeat_timeout=60,

35

schema_event_refresh_window=2,

36

topology_event_refresh_window=10,

37

status_event_refresh_window=2,

38

prepare_on_all_hosts=True,

39

reprepare_on_up=True,

40

max_requests_per_connection=None,

41

**kwargs

42

):

43

"""

44

Initialize a Cluster instance to manage Cassandra connections.

45

46

Parameters:

47

- contact_points (list): List of host addresses to initially connect to

48

- port (int): Port number for Cassandra connections (default: 9042)

49

- executor_threads (int): Number of threads for I/O operations

50

- auth_provider (AuthProvider): Authentication provider for credentials

51

- load_balancing_policy (LoadBalancingPolicy): Policy for host selection

52

- reconnection_policy (ReconnectionPolicy): Policy for reconnection delays

53

- default_retry_policy (RetryPolicy): Default retry policy for failed operations

54

- conviction_policy (ConvictionPolicy): Policy for marking hosts as failed

55

- metrics_enabled (bool): Enable connection and request metrics

56

- connection_class: Connection implementation class

57

- ssl_context: SSL context for encrypted connections

58

- ssl_options (dict): SSL configuration options

59

- sockopts (list): Socket options tuples

60

- cql_version (str): CQL version to use

61

- protocol_version (int): Native protocol version (1-4)

62

- compression (bool or str): Enable compression ('snappy', 'lz4', or True)

63

- max_schema_agreement_wait (float): Max time to wait for schema agreement

64

- control_connection_timeout (float): Timeout for control connection operations

65

- idle_heartbeat_interval (float): Interval between heartbeat messages

66

- idle_heartbeat_timeout (float): Timeout for heartbeat responses

67

- prepare_on_all_hosts (bool): Prepare statements on all hosts

68

- reprepare_on_up (bool): Re-prepare statements when hosts come back up

69

- max_requests_per_connection (int): Max concurrent requests per connection

70

"""

71

72

def connect(self, keyspace=None):

73

"""

74

Create a new Session for this cluster.

75

76

Parameters:

77

- keyspace (str): Default keyspace for the session

78

79

Returns:

80

Session: A new session connected to the cluster

81

"""

82

83

def shutdown(self):

84

"""

85

Shut down this cluster and all associated sessions.

86

Closes all connections and stops background threads.

87

"""

88

89

def add_host(self, address, datacenter=None, rack=None, signal=True):

90

"""

91

Add a host to the cluster.

92

93

Parameters:

94

- address (str): Host address to add

95

- datacenter (str): Datacenter name

96

- rack (str): Rack name

97

- signal (bool): Whether to signal policy changes

98

"""

99

100

def remove_host(self, host):

101

"""

102

Remove a host from the cluster.

103

104

Parameters:

105

- host (Host): Host object to remove

106

"""

107

108

@property

109

def metadata(self):

110

"""Metadata: Cluster metadata including keyspace and table information"""

111

112

@property

113

def metrics(self):

114

"""Metrics: Connection and request metrics if enabled"""

115

```

116

117

### Session Operations

118

119

The Session class executes queries and manages prepared statements within a keyspace context.

120

121

```python { .api }

122

class Session:

123

def execute(self, query, parameters=None, timeout=None, trace=False):

124

"""

125

Execute a query synchronously.

126

127

Parameters:

128

- query (str or Statement): CQL query string or Statement object

129

- parameters (list or dict): Query parameters for placeholder binding

130

- timeout (float): Query timeout in seconds

131

- trace (bool): Enable query tracing

132

133

Returns:

134

ResultSet: Query results with row data and metadata

135

136

Raises:

137

- Unavailable: Not enough replicas available

138

- ReadTimeout: Read operation timed out

139

- WriteTimeout: Write operation timed out

140

- InvalidRequest: Invalid query or parameters

141

"""

142

143

def execute_async(self, query, parameters=None, trace=False):

144

"""

145

Execute a query asynchronously.

146

147

Parameters:

148

- query (str or Statement): CQL query string or Statement object

149

- parameters (list or dict): Query parameters for placeholder binding

150

- trace (bool): Enable query tracing

151

152

Returns:

153

ResponseFuture: Future object for asynchronous result handling

154

"""

155

156

def prepare(self, query):

157

"""

158

Prepare a query for efficient repeated execution.

159

160

Parameters:

161

- query (str): CQL query string with parameter placeholders

162

163

Returns:

164

PreparedStatement: Prepared statement object for binding and execution

165

"""

166

167

def shutdown(self):

168

"""

169

Shut down this session and close all connections.

170

"""

171

172

def set_keyspace(self, keyspace):

173

"""

174

Set the default keyspace for this session.

175

176

Parameters:

177

- keyspace (str): Keyspace name to use as default

178

"""

179

180

@property

181

def keyspace(self):

182

"""str: Current default keyspace for this session"""

183

184

@property

185

def cluster(self):

186

"""Cluster: The cluster this session is connected to"""

187

188

@property

189

def hosts(self):

190

"""set: Set of Host objects in the cluster"""

191

192

@property

193

def user_type_deserializers(self):

194

"""dict: User-defined type deserializers"""

195

196

@property

197

def encoder(self):

198

"""Encoder: Parameter encoder for this session"""

199

200

@property

201

def row_factory(self):

202

"""callable: Factory function for creating row objects from results"""

203

204

@row_factory.setter

205

def row_factory(self, factory):

206

"""Set the row factory for result processing"""

207

208

@property

209

def default_timeout(self):

210

"""float: Default timeout for queries executed by this session"""

211

212

@default_timeout.setter

213

def default_timeout(self, timeout):

214

"""Set the default timeout for queries"""

215

216

@property

217

def default_consistency_level(self):

218

"""int: Default consistency level for queries"""

219

220

@default_consistency_level.setter

221

def default_consistency_level(self, consistency_level):

222

"""Set the default consistency level"""

223

224

@property

225

def default_serial_consistency_level(self):

226

"""int: Default serial consistency level for conditional queries"""

227

228

@default_serial_consistency_level.setter

229

def default_serial_consistency_level(self, serial_consistency_level):

230

"""Set the default serial consistency level"""

231

```

232

233

### Response Handling

234

235

Asynchronous response objects for non-blocking query execution.

236

237

```python { .api }

238

class ResponseFuture:

239

def result(self, timeout=None):

240

"""

241

Block until the query completes and return the result.

242

243

Parameters:

244

- timeout (float): Maximum time to wait for result

245

246

Returns:

247

ResultSet: Query results

248

249

Raises:

250

- Timeout: Operation timed out

251

- Various query-specific exceptions

252

"""

253

254

def get_query_trace(self, max_wait=2.0):

255

"""

256

Get the query trace if tracing was enabled.

257

258

Parameters:

259

- max_wait (float): Maximum time to wait for trace data

260

261

Returns:

262

QueryTrace: Trace information for the executed query

263

"""

264

265

def add_callback(self, fn, *args, **kwargs):

266

"""

267

Add a callback function to be called when query completes successfully.

268

269

Parameters:

270

- fn (callable): Function to call with (result, *args, **kwargs)

271

- args: Additional positional arguments for callback

272

- kwargs: Additional keyword arguments for callback

273

"""

274

275

def add_errback(self, fn, *args, **kwargs):

276

"""

277

Add an error callback function to be called if query fails.

278

279

Parameters:

280

- fn (callable): Function to call with (exception, *args, **kwargs)

281

- args: Additional positional arguments for callback

282

- kwargs: Additional keyword arguments for callback

283

"""

284

285

def add_callbacks(self, callback, errback, callback_args=(), callback_kwargs=None, errback_args=(), errback_kwargs=None):

286

"""

287

Add both success and error callbacks.

288

289

Parameters:

290

- callback (callable): Success callback function

291

- errback (callable): Error callback function

292

- callback_args (tuple): Arguments for success callback

293

- callback_kwargs (dict): Keyword arguments for success callback

294

- errback_args (tuple): Arguments for error callback

295

- errback_kwargs (dict): Keyword arguments for error callback

296

"""

297

298

@property

299

def query(self):

300

"""str or Statement: The query that was executed"""

301

302

@property

303

def session(self):

304

"""Session: The session used to execute the query"""

305

306

@property

307

def has_more_pages(self):

308

"""bool: Whether there are more pages of results available"""

309

```

310

311

### Result Processing

312

313

Result sets and paging support for handling large query results.

314

315

```python { .api }

316

class ResultSet:

317

def __iter__(self):

318

"""Iterate over result rows"""

319

320

def __len__(self):

321

"""Get the number of rows in current page"""

322

323

def __getitem__(self, index):

324

"""Get a row by index"""

325

326

@property

327

def current_rows(self):

328

"""list: Rows in the current page"""

329

330

@property

331

def has_more_pages(self):

332

"""bool: Whether there are more pages available"""

333

334

@property

335

def response_future(self):

336

"""ResponseFuture: Future object used to fetch this result"""

337

338

@property

339

def column_names(self):

340

"""list: Names of columns in the result set"""

341

342

@property

343

def column_types(self):

344

"""list: CQL types of columns in the result set"""

345

346

class PagedResult:

347

def __init__(self, future, session):

348

"""

349

Initialize a paged result iterator.

350

351

Parameters:

352

- future (ResponseFuture): Initial response future

353

- session (Session): Session for fetching additional pages

354

"""

355

356

def __iter__(self):

357

"""Iterate over all rows across all pages"""

358

```

359

360

### Connection Pool Management

361

362

Host and connection pool management for optimal performance.

363

364

```python { .api }

365

class Host:

366

def __init__(self, address, datacenter=None, rack=None):

367

"""

368

Represent a Cassandra host in the cluster.

369

370

Parameters:

371

- address (str): Host IP address or hostname

372

- datacenter (str): Datacenter name

373

- rack (str): Rack name

374

"""

375

376

@property

377

def address(self):

378

"""str: Host address"""

379

380

@property

381

def datacenter(self):

382

"""str: Datacenter name"""

383

384

@property

385

def rack(self):

386

"""str: Rack name"""

387

388

@property

389

def is_up(self):

390

"""bool: Whether the host is currently up"""

391

392

@property

393

def release_version(self):

394

"""str: Cassandra release version on this host"""

395

396

class HostConnectionPool:

397

def __init__(self, host, host_distance, session):

398

"""

399

Manage connections to a specific host.

400

401

Parameters:

402

- host (Host): Target host

403

- host_distance (int): Distance category for this host

404

- session (Session): Parent session

405

"""

406

407

def borrow_connection(self, timeout):

408

"""

409

Borrow a connection from the pool.

410

411

Parameters:

412

- timeout (float): Maximum time to wait for connection

413

414

Returns:

415

Connection: Available connection object

416

"""

417

418

def return_connection(self, connection):

419

"""

420

Return a connection to the pool.

421

422

Parameters:

423

- connection (Connection): Connection to return

424

"""

425

426

@property

427

def host(self):

428

"""Host: The host this pool connects to"""

429

430

@property

431

def is_shutdown(self):

432

"""bool: Whether this pool has been shut down"""

433

434

@property

435

def open_count(self):

436

"""int: Number of open connections in the pool"""

437

```

438

439

### Exception Classes

440

441

Specific exceptions for cluster and session operations.

442

443

```python { .api }

444

class NoHostAvailable(Exception):

445

"""No hosts are available for connection."""

446

447

def __init__(self, message, errors):

448

"""

449

Parameters:

450

- message (str): Error message

451

- errors (dict): Dict mapping Host objects to their connection errors

452

"""

453

454

@property

455

def errors(self):

456

"""dict: Connection errors by host"""

457

458

class QueryExhausted(Exception):

459

"""All query retries have been exhausted."""

460

461

def __init__(self, message, last_host):

462

"""

463

Parameters:

464

- message (str): Error message

465

- last_host (Host): Last host that was tried

466

"""

467

468

class UserTypeDoesNotExist(Exception):

469

"""Referenced user-defined type does not exist."""

470

471

def __init__(self, keyspace, user_type):

472

"""

473

Parameters:

474

- keyspace (str): Keyspace name

475

- user_type (str): User-defined type name

476

"""

477

```

478

479

## Usage Examples

480

481

### Basic Cluster Setup

482

483

```python

484

from cassandra.cluster import Cluster

485

from cassandra.auth import PlainTextAuthProvider

486

from cassandra.policies import DCAwareRoundRobinPolicy

487

488

# Setup authentication

489

auth_provider = PlainTextAuthProvider(username='cassandra', password='cassandra')

490

491

# Setup load balancing

492

load_balancing_policy = DCAwareRoundRobinPolicy(local_dc='datacenter1')

493

494

# Create cluster with custom configuration

495

cluster = Cluster(

496

contact_points=['127.0.0.1', '127.0.0.2'],

497

port=9042,

498

auth_provider=auth_provider,

499

load_balancing_policy=load_balancing_policy,

500

protocol_version=4

501

)

502

503

# Connect and execute queries

504

session = cluster.connect()

505

session.set_keyspace('my_keyspace')

506

507

# Execute synchronous query

508

result = session.execute("SELECT * FROM users WHERE id = %s", [user_id])

509

for row in result:

510

print(f"User: {row.name}")

511

512

# Execute asynchronous query

513

future = session.execute_async("SELECT * FROM users")

514

result = future.result()

515

516

# Clean up

517

cluster.shutdown()

518

```

519

520

### Prepared Statement Usage

521

522

```python

523

# Prepare a statement for repeated execution

524

insert_stmt = session.prepare("""

525

INSERT INTO users (id, name, email, created_at)

526

VALUES (?, ?, ?, ?)

527

""")

528

529

# Execute with different parameters

530

import uuid

531

from datetime import datetime

532

533

session.execute(insert_stmt, [

534

uuid.uuid4(),

535

'Alice Smith',

536

'alice@example.com',

537

datetime.now()

538

])

539

540

session.execute(insert_stmt, [

541

uuid.uuid4(),

542

'Bob Jones',

543

'bob@example.com',

544

datetime.now()

545

])

546

```

547

548

### Asynchronous Operations

549

550

```python

551

def handle_success(result):

552

print(f"Query succeeded with {len(result)} rows")

553

for row in result:

554

print(f"User: {row.name}")

555

556

def handle_error(exception):

557

print(f"Query failed: {exception}")

558

559

# Execute with callbacks

560

future = session.execute_async("SELECT * FROM users")

561

future.add_callback(handle_success)

562

future.add_errback(handle_error)

563

564

# Or wait for result

565

try:

566

result = future.result(timeout=10.0)

567

print(f"Got {len(result)} rows")

568

except Exception as e:

569

print(f"Query failed: {e}")

570

```