or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

async-io.mdauth-policies.mdcluster-session.mdcql-types.mdcqlengine-orm.mdindex.mdmetadata.mdquery-execution.md

async-io.mddocs/

0

# Asynchronous I/O

1

2

I/O reactor implementations, concurrent execution utilities, and asynchronous operation patterns for high-performance applications. The cassandra-driver supports multiple async frameworks and provides utilities for concurrent query execution.

3

4

## Capabilities

5

6

### I/O Reactor Implementations

7

8

Connection implementations for different asynchronous frameworks.

9

10

```python { .api }

11

class AsyncoreConnection:

12

"""

13

Connection implementation using Python's asyncore module.

14

15

This is the default connection class providing basic asynchronous I/O

16

without external dependencies.

17

"""

18

19

class GeventConnection:

20

"""

21

Connection implementation using Gevent for async I/O.

22

23

Requires: gevent package

24

Usage: Set as connection_class in Cluster configuration

25

"""

26

27

class EventletConnection:

28

"""

29

Connection implementation using Eventlet for async I/O.

30

31

Requires: eventlet package

32

Usage: Set as connection_class in Cluster configuration

33

"""

34

35

class TwistedConnection:

36

"""

37

Connection implementation using Twisted framework.

38

39

Requires: Twisted package

40

Usage: Set as connection_class in Cluster configuration

41

"""

42

43

class LibevConnection:

44

"""

45

Connection implementation using libev for high-performance I/O.

46

47

Requires: libev system library and C extension compilation

48

Usage: Set as connection_class in Cluster configuration

49

"""

50

```

51

52

### Concurrent Execution

53

54

Utilities for executing multiple queries concurrently for improved throughput.

55

56

```python { .api }

57

def execute_concurrent(session, statements_and_parameters, concurrency=100, results_generator=False):

58

"""

59

Execute multiple statements concurrently.

60

61

Parameters:

62

- session (Session): Session to execute queries on

63

- statements_and_parameters (iterable): Sequence of (statement, parameters) tuples

64

- concurrency (int): Maximum number of concurrent requests

65

- results_generator (bool): Return generator instead of list

66

67

Returns:

68

list or generator: Results from query execution, with None for failed queries

69

70

Example:

71

statements = [

72

(SimpleStatement("INSERT INTO users (id, name) VALUES (?, ?)"), [uuid.uuid4(), 'Alice']),

73

(SimpleStatement("INSERT INTO users (id, name) VALUES (?, ?)"), [uuid.uuid4(), 'Bob']),

74

]

75

results = execute_concurrent(session, statements)

76

"""

77

78

def execute_concurrent_with_args(session, statement, parameters, concurrency=100, results_generator=False):

79

"""

80

Execute a single statement with multiple parameter sets concurrently.

81

82

Parameters:

83

- session (Session): Session to execute queries on

84

- statement (str or Statement): Statement to execute repeatedly

85

- parameters (iterable): Sequence of parameter lists/dicts

86

- concurrency (int): Maximum number of concurrent requests

87

- results_generator (bool): Return generator instead of list

88

89

Returns:

90

list or generator: Results from query execution, with None for failed queries

91

92

Example:

93

statement = "INSERT INTO users (id, name) VALUES (?, ?)"

94

parameters = [

95

[uuid.uuid4(), 'Alice'],

96

[uuid.uuid4(), 'Bob'],

97

[uuid.uuid4(), 'Charlie']

98

]

99

results = execute_concurrent_with_args(session, statement, parameters)

100

"""

101

```

102

103

### Asynchronous Response Handling

104

105

Enhanced response future handling for asynchronous operations.

106

107

```python { .api }

108

class ResponseFuture:

109

"""

110

Future object representing an asynchronous query execution.

111

"""

112

113

def result(self, timeout=None):

114

"""

115

Block and wait for the query result.

116

117

Parameters:

118

- timeout (float): Maximum time to wait in seconds

119

120

Returns:

121

ResultSet: Query results

122

123

Raises:

124

- Timeout: If timeout is exceeded

125

- Various query-specific exceptions

126

"""

127

128

def get_query_trace(self, max_wait=2.0):

129

"""

130

Get query trace information if tracing was enabled.

131

132

Parameters:

133

- max_wait (float): Maximum time to wait for trace

134

135

Returns:

136

QueryTrace: Trace information or None

137

"""

138

139

def add_callback(self, fn, *args, **kwargs):

140

"""

141

Add success callback to be executed when query completes successfully.

142

143

Parameters:

144

- fn (callable): Callback function

145

- args: Additional arguments for callback

146

- kwargs: Additional keyword arguments for callback

147

148

The callback will be called with: fn(result, *args, **kwargs)

149

"""

150

151

def add_errback(self, fn, *args, **kwargs):

152

"""

153

Add error callback to be executed when query fails.

154

155

Parameters:

156

- fn (callable): Error callback function

157

- args: Additional arguments for callback

158

- kwargs: Additional keyword arguments for callback

159

160

The errback will be called with: fn(exception, *args, **kwargs)

161

"""

162

163

def add_callbacks(self, callback, errback, callback_args=(), callback_kwargs=None, errback_args=(), errback_kwargs=None):

164

"""

165

Add both success and error callbacks.

166

167

Parameters:

168

- callback (callable): Success callback

169

- errback (callable): Error callback

170

- callback_args (tuple): Arguments for success callback

171

- callback_kwargs (dict): Keyword arguments for success callback

172

- errback_args (tuple): Arguments for error callback

173

- errback_kwargs (dict): Keyword arguments for error callback

174

"""

175

176

@property

177

def query(self):

178

"""str or Statement: The query that was executed"""

179

180

@property

181

def session(self):

182

"""Session: The session used for execution"""

183

184

@property

185

def coordinator_host(self):

186

"""Host: The coordinator host for this query"""

187

188

@property

189

def has_more_pages(self):

190

"""bool: Whether there are more pages of results"""

191

192

@property

193

def warnings(self):

194

"""list: List of warning messages from the server"""

195

196

@property

197

def custom_payload(self):

198

"""dict: Custom payload returned by the server"""

199

200

@property

201

def is_schema_agreed(self):

202

"""bool: Whether schema agreement was reached"""

203

```

204

205

### Connection Pool Events

206

207

Event-driven connection pool management for monitoring and debugging.

208

209

```python { .api }

210

class HostConnectionPool:

211

"""

212

Connection pool for a specific host with async capabilities.

213

"""

214

215

def get_connections(self):

216

"""

217

Get all connections in the pool.

218

219

Returns:

220

set: Set of active connections

221

"""

222

223

def return_connection(self, connection):

224

"""

225

Return a connection to the pool.

226

227

Parameters:

228

- connection: Connection object to return

229

"""

230

231

def shutdown(self):

232

"""

233

Shutdown the connection pool and close all connections.

234

"""

235

236

@property

237

def host(self):

238

"""Host: The host this pool connects to"""

239

240

@property

241

def is_shutdown(self):

242

"""bool: Whether the pool has been shut down"""

243

244

@property

245

def open_count(self):

246

"""int: Number of open connections"""

247

```

248

249

## Usage Examples

250

251

### Basic Asynchronous Operations

252

253

```python

254

from cassandra.cluster import Cluster

255

from cassandra.query import SimpleStatement

256

import uuid

257

258

cluster = Cluster()

259

session = cluster.connect('keyspace1')

260

261

# Execute query asynchronously

262

future = session.execute_async("SELECT * FROM users")

263

264

# Option 1: Block and wait for result

265

result = future.result(timeout=10.0)

266

for row in result:

267

print(f"User: {row.name}")

268

269

# Option 2: Use callbacks

270

def handle_success(result):

271

print(f"Query returned {len(result)} rows")

272

for row in result:

273

print(f"User: {row.name}")

274

275

def handle_error(exception):

276

print(f"Query failed: {exception}")

277

278

future = session.execute_async("SELECT * FROM users")

279

future.add_callback(handle_success)

280

future.add_errback(handle_error)

281

282

# Continue with other work while query executes in background

283

print("Query executing in background...")

284

```

285

286

### Using Different I/O Reactors

287

288

```python

289

from cassandra.cluster import Cluster

290

from cassandra.io.geventreactor import GeventConnection

291

from cassandra.io.eventletreactor import EventletConnection

292

from cassandra.io.twistedreactor import TwistedConnection

293

from cassandra.io.libevreactor import LibevConnection

294

295

# Gevent reactor (requires: pip install gevent)

296

cluster_gevent = Cluster(

297

contact_points=['127.0.0.1'],

298

connection_class=GeventConnection

299

)

300

301

# Eventlet reactor (requires: pip install eventlet)

302

cluster_eventlet = Cluster(

303

contact_points=['127.0.0.1'],

304

connection_class=EventletConnection

305

)

306

307

# Twisted reactor (requires: pip install twisted)

308

cluster_twisted = Cluster(

309

contact_points=['127.0.0.1'],

310

connection_class=TwistedConnection

311

)

312

313

# Libev reactor (requires libev system library)

314

cluster_libev = Cluster(

315

contact_points=['127.0.0.1'],

316

connection_class=LibevConnection

317

)

318

319

# Use the appropriate reactor for your application

320

session = cluster_gevent.connect()

321

```

322

323

### Concurrent Query Execution

324

325

```python

326

from cassandra.concurrent import execute_concurrent, execute_concurrent_with_args

327

from cassandra.query import SimpleStatement

328

import uuid

329

from datetime import datetime

330

331

# Example 1: Execute different statements concurrently

332

statements_and_params = [

333

(SimpleStatement("INSERT INTO users (id, name, email) VALUES (?, ?, ?)"),

334

[uuid.uuid4(), 'Alice', 'alice@example.com']),

335

(SimpleStatement("INSERT INTO users (id, name, email) VALUES (?, ?, ?)"),

336

[uuid.uuid4(), 'Bob', 'bob@example.com']),

337

(SimpleStatement("INSERT INTO posts (id, author, title) VALUES (?, ?, ?)"),

338

[uuid.uuid4(), 'Alice', 'My First Post']),

339

(SimpleStatement("UPDATE counters SET count = count + 1 WHERE id = ?"),

340

['total_users'])

341

]

342

343

# Execute all statements concurrently

344

results = execute_concurrent(session, statements_and_params, concurrency=50)

345

346

# Check results

347

for i, result in enumerate(results):

348

if result is None:

349

print(f"Statement {i} failed")

350

else:

351

print(f"Statement {i} succeeded")

352

353

# Example 2: Execute same statement with different parameters

354

insert_statement = SimpleStatement("INSERT INTO users (id, name, email) VALUES (?, ?, ?)")

355

user_data = [

356

[uuid.uuid4(), 'User1', 'user1@example.com'],

357

[uuid.uuid4(), 'User2', 'user2@example.com'],

358

[uuid.uuid4(), 'User3', 'user3@example.com'],

359

# ... many more users

360

]

361

362

# Execute all inserts concurrently

363

results = execute_concurrent_with_args(

364

session,

365

insert_statement,

366

user_data,

367

concurrency=100

368

)

369

370

successful_inserts = sum(1 for result in results if result is not None)

371

print(f"Successfully inserted {successful_inserts} users")

372

```

373

374

### Advanced Callback Patterns

375

376

```python

377

import threading

378

from collections import defaultdict

379

380

class AsyncQueryManager:

381

def __init__(self, session):

382

self.session = session

383

self.results = defaultdict(list)

384

self.errors = defaultdict(list)

385

self.lock = threading.Lock()

386

self.completed_count = 0

387

self.total_queries = 0

388

389

def execute_queries(self, query_groups):

390

"""Execute multiple groups of queries with organized results."""

391

392

self.total_queries = sum(len(queries) for queries in query_groups.values())

393

394

for group_name, queries in query_groups.items():

395

for query, params in queries:

396

future = self.session.execute_async(query, params)

397

future.add_callback(self._handle_success, group_name)

398

future.add_errback(self._handle_error, group_name)

399

400

def _handle_success(self, result, group_name):

401

with self.lock:

402

self.results[group_name].append(result)

403

self.completed_count += 1

404

self._check_completion()

405

406

def _handle_error(self, error, group_name):

407

with self.lock:

408

self.errors[group_name].append(error)

409

self.completed_count += 1

410

self._check_completion()

411

412

def _check_completion(self):

413

if self.completed_count >= self.total_queries:

414

print("All queries completed!")

415

self._print_summary()

416

417

def _print_summary(self):

418

for group_name in self.results:

419

success_count = len(self.results[group_name])

420

error_count = len(self.errors[group_name])

421

print(f"{group_name}: {success_count} successful, {error_count} failed")

422

423

# Usage

424

manager = AsyncQueryManager(session)

425

426

query_groups = {

427

'user_inserts': [

428

("INSERT INTO users (id, name) VALUES (?, ?)", [uuid.uuid4(), f'User{i}'])

429

for i in range(100)

430

],

431

'post_inserts': [

432

("INSERT INTO posts (id, title) VALUES (?, ?)", [uuid.uuid4(), f'Post{i}'])

433

for i in range(50)

434

],

435

'analytics_queries': [

436

("SELECT COUNT(*) FROM users", []),

437

("SELECT COUNT(*) FROM posts", []),

438

("SELECT * FROM recent_activity LIMIT 10", [])

439

]

440

}

441

442

manager.execute_queries(query_groups)

443

```

444

445

### Asynchronous Pagination

446

447

```python

448

class AsyncPaginator:

449

def __init__(self, session, query, page_size=1000):

450

self.session = session

451

self.query = query

452

self.page_size = page_size

453

self.callbacks = []

454

self.error_callbacks = []

455

456

def add_page_callback(self, callback):

457

"""Add callback to be called for each page of results."""

458

self.callbacks.append(callback)

459

460

def add_error_callback(self, callback):

461

"""Add callback to be called on errors."""

462

self.error_callbacks.append(callback)

463

464

def start(self):

465

"""Start paginating through results."""

466

statement = SimpleStatement(self.query, fetch_size=self.page_size)

467

future = self.session.execute_async(statement)

468

future.add_callback(self._handle_page)

469

future.add_errback(self._handle_error)

470

471

def _handle_page(self, result):

472

"""Handle a page of results."""

473

# Process current page

474

for callback in self.callbacks:

475

callback(result.current_rows)

476

477

# Check if there are more pages

478

if result.has_more_pages:

479

# Fetch next page asynchronously

480

statement = SimpleStatement(self.query, fetch_size=self.page_size)

481

statement.paging_state = result.paging_state

482

future = self.session.execute_async(statement)

483

future.add_callback(self._handle_page)

484

future.add_errback(self._handle_error)

485

else:

486

print("Pagination complete")

487

488

def _handle_error(self, error):

489

"""Handle pagination errors."""

490

for callback in self.error_callbacks:

491

callback(error)

492

493

# Usage

494

def process_page(rows):

495

print(f"Processing page with {len(rows)} rows")

496

for row in rows:

497

# Process each row

498

pass

499

500

def handle_error(error):

501

print(f"Pagination error: {error}")

502

503

paginator = AsyncPaginator(session, "SELECT * FROM large_table")

504

paginator.add_page_callback(process_page)

505

paginator.add_error_callback(handle_error)

506

paginator.start()

507

508

# Continue with other work while pagination happens in background

509

```

510

511

### Asynchronous Batch Processing

512

513

```python

514

import asyncio

515

from concurrent.futures import ThreadPoolExecutor

516

517

class AsyncBatchProcessor:

518

def __init__(self, session, batch_size=1000, concurrency=10):

519

self.session = session

520

self.batch_size = batch_size

521

self.concurrency = concurrency

522

self.executor = ThreadPoolExecutor(max_workers=concurrency)

523

524

def process_records(self, records, process_func):

525

"""Process records in batches asynchronously."""

526

527

# Split records into batches

528

batches = [

529

records[i:i + self.batch_size]

530

for i in range(0, len(records), self.batch_size)

531

]

532

533

print(f"Processing {len(records)} records in {len(batches)} batches")

534

535

# Process batches concurrently

536

futures = []

537

for batch in batches:

538

future = self.executor.submit(self._process_batch, batch, process_func)

539

futures.append(future)

540

541

# Wait for all batches to complete

542

results = []

543

for future in futures:

544

try:

545

result = future.result(timeout=60)

546

results.append(result)

547

except Exception as e:

548

print(f"Batch processing error: {e}")

549

results.append(None)

550

551

return results

552

553

def _process_batch(self, batch, process_func):

554

"""Process a single batch of records."""

555

statements_and_params = []

556

557

for record in batch:

558

query, params = process_func(record)

559

statements_and_params.append((query, params))

560

561

# Execute batch concurrently

562

results = execute_concurrent(

563

self.session,

564

statements_and_params,

565

concurrency=self.concurrency

566

)

567

568

return results

569

570

# Usage

571

def create_insert_statement(user_data):

572

"""Convert user data to insert statement."""

573

query = "INSERT INTO users (id, name, email, created_at) VALUES (?, ?, ?, ?)"

574

params = [

575

user_data['id'],

576

user_data['name'],

577

user_data['email'],

578

datetime.utcnow()

579

]

580

return query, params

581

582

# Process large dataset

583

user_records = [

584

{'id': uuid.uuid4(), 'name': f'User{i}', 'email': f'user{i}@example.com'}

585

for i in range(10000)

586

]

587

588

processor = AsyncBatchProcessor(session, batch_size=500, concurrency=20)

589

results = processor.process_records(user_records, create_insert_statement)

590

591

# Analyze results

592

total_batches = len(results)

593

successful_batches = sum(1 for r in results if r is not None)

594

print(f"Processed {total_batches} batches, {successful_batches} successful")

595

```

596

597

### Connection Pool Monitoring

598

599

```python

600

from cassandra.pool import Host

601

602

class ConnectionPoolMonitor:

603

def __init__(self, cluster):

604

self.cluster = cluster

605

606

def get_pool_stats(self):

607

"""Get connection pool statistics for all hosts."""

608

stats = {}

609

610

if hasattr(self.cluster, 'metadata') and self.cluster.metadata:

611

for host in self.cluster.metadata.all_hosts():

612

pool = self.cluster._connection_pools.get(host)

613

if pool:

614

stats[host.address] = {

615

'host_state': 'UP' if host.is_up else 'DOWN',

616

'datacenter': host.datacenter,

617

'rack': host.rack,

618

'open_connections': pool.open_count,

619

'is_pool_shutdown': pool.is_shutdown,

620

'pool_size': len(pool.get_connections()) if not pool.is_shutdown else 0

621

}

622

623

return stats

624

625

def print_pool_summary(self):

626

"""Print a summary of connection pool status."""

627

stats = self.get_pool_stats()

628

629

print("Connection Pool Summary:")

630

print("-" * 60)

631

632

for host_address, host_stats in stats.items():

633

print(f"Host: {host_address}")

634

print(f" State: {host_stats['host_state']}")

635

print(f" DC/Rack: {host_stats['datacenter']}/{host_stats['rack']}")

636

print(f" Open Connections: {host_stats['open_connections']}")

637

print(f" Pool Size: {host_stats['pool_size']}")

638

print()

639

640

# Usage

641

monitor = ConnectionPoolMonitor(cluster)

642

643

# Monitor pool stats periodically

644

import time

645

import threading

646

647

def monitor_pools():

648

while True:

649

monitor.print_pool_summary()

650

time.sleep(30) # Check every 30 seconds

651

652

# Start monitoring in background thread

653

monitor_thread = threading.Thread(target=monitor_pools, daemon=True)

654

monitor_thread.start()

655

656

# Your application continues running

657

session = cluster.connect()

658

# ... perform queries ...

659

```

660

661

### Asynchronous Query Timeout Handling

662

663

```python

664

from cassandra import OperationTimedOut

665

import time

666

667

class TimeoutHandler:

668

def __init__(self, session, default_timeout=10.0):

669

self.session = session

670

self.default_timeout = default_timeout

671

self.timeout_stats = {

672

'total_queries': 0,

673

'timeouts': 0,

674

'retries': 0,

675

'failures': 0

676

}

677

678

def execute_with_retry(self, query, params=None, max_retries=3, timeout=None):

679

"""Execute query with timeout handling and retries."""

680

681

timeout = timeout or self.default_timeout

682

683

for attempt in range(max_retries + 1):

684

try:

685

self.timeout_stats['total_queries'] += 1

686

687

future = self.session.execute_async(query, params)

688

result = future.result(timeout=timeout)

689

690

return result

691

692

except OperationTimedOut as e:

693

self.timeout_stats['timeouts'] += 1

694

695

if attempt < max_retries:

696

self.timeout_stats['retries'] += 1

697

print(f"Query timed out, retrying (attempt {attempt + 1}/{max_retries})")

698

699

# Exponential backoff

700

time.sleep(2 ** attempt)

701

continue

702

else:

703

self.timeout_stats['failures'] += 1

704

print(f"Query failed after {max_retries} retries")

705

raise

706

707

except Exception as e:

708

self.timeout_stats['failures'] += 1

709

print(f"Query failed with non-timeout error: {e}")

710

raise

711

712

def get_timeout_stats(self):

713

"""Get timeout statistics."""

714

stats = self.timeout_stats.copy()

715

if stats['total_queries'] > 0:

716

stats['timeout_rate'] = stats['timeouts'] / stats['total_queries']

717

stats['success_rate'] = (stats['total_queries'] - stats['failures']) / stats['total_queries']

718

return stats

719

720

# Usage

721

timeout_handler = TimeoutHandler(session, default_timeout=5.0)

722

723

# Execute queries with timeout handling

724

try:

725

result = timeout_handler.execute_with_retry(

726

"SELECT * FROM slow_table WHERE complex_condition = ?",

727

params=['some_value'],

728

max_retries=2,

729

timeout=15.0

730

)

731

print(f"Query succeeded with {len(result)} results")

732

733

except OperationTimedOut:

734

print("Query timed out after all retries")

735

except Exception as e:

736

print(f"Query failed: {e}")

737

738

# Check timeout statistics

739

stats = timeout_handler.get_timeout_stats()

740

print(f"Timeout statistics: {stats}")

741

```