0
# Asynchronous I/O
1
2
I/O reactor implementations, concurrent execution utilities, and asynchronous operation patterns for high-performance applications. The cassandra-driver supports multiple async frameworks and provides utilities for concurrent query execution.
3
4
## Capabilities
5
6
### I/O Reactor Implementations
7
8
Connection implementations for different asynchronous frameworks.
9
10
```python { .api }
11
class AsyncoreConnection:
12
"""
13
Connection implementation using Python's asyncore module.
14
15
This is the default connection class providing basic asynchronous I/O
16
without external dependencies.
17
"""
18
19
class GeventConnection:
20
"""
21
Connection implementation using Gevent for async I/O.
22
23
Requires: gevent package
24
Usage: Set as connection_class in Cluster configuration
25
"""
26
27
class EventletConnection:
28
"""
29
Connection implementation using Eventlet for async I/O.
30
31
Requires: eventlet package
32
Usage: Set as connection_class in Cluster configuration
33
"""
34
35
class TwistedConnection:
36
"""
37
Connection implementation using Twisted framework.
38
39
Requires: Twisted package
40
Usage: Set as connection_class in Cluster configuration
41
"""
42
43
class LibevConnection:
44
"""
45
Connection implementation using libev for high-performance I/O.
46
47
Requires: libev system library and C extension compilation
48
Usage: Set as connection_class in Cluster configuration
49
"""
50
```
51
52
### Concurrent Execution
53
54
Utilities for executing multiple queries concurrently for improved throughput.
55
56
```python { .api }
57
def execute_concurrent(session, statements_and_parameters, concurrency=100, results_generator=False):
58
"""
59
Execute multiple statements concurrently.
60
61
Parameters:
62
- session (Session): Session to execute queries on
63
- statements_and_parameters (iterable): Sequence of (statement, parameters) tuples
64
- concurrency (int): Maximum number of concurrent requests
65
- results_generator (bool): Return generator instead of list
66
67
Returns:
68
list or generator: Results from query execution, with None for failed queries
69
70
Example:
71
statements = [
72
(SimpleStatement("INSERT INTO users (id, name) VALUES (?, ?)"), [uuid.uuid4(), 'Alice']),
73
(SimpleStatement("INSERT INTO users (id, name) VALUES (?, ?)"), [uuid.uuid4(), 'Bob']),
74
]
75
results = execute_concurrent(session, statements)
76
"""
77
78
def execute_concurrent_with_args(session, statement, parameters, concurrency=100, results_generator=False):
79
"""
80
Execute a single statement with multiple parameter sets concurrently.
81
82
Parameters:
83
- session (Session): Session to execute queries on
84
- statement (str or Statement): Statement to execute repeatedly
85
- parameters (iterable): Sequence of parameter lists/dicts
86
- concurrency (int): Maximum number of concurrent requests
87
- results_generator (bool): Return generator instead of list
88
89
Returns:
90
list or generator: Results from query execution, with None for failed queries
91
92
Example:
93
statement = "INSERT INTO users (id, name) VALUES (?, ?)"
94
parameters = [
95
[uuid.uuid4(), 'Alice'],
96
[uuid.uuid4(), 'Bob'],
97
[uuid.uuid4(), 'Charlie']
98
]
99
results = execute_concurrent_with_args(session, statement, parameters)
100
"""
101
```
102
103
### Asynchronous Response Handling
104
105
Enhanced response future handling for asynchronous operations.
106
107
```python { .api }
108
class ResponseFuture:
109
"""
110
Future object representing an asynchronous query execution.
111
"""
112
113
def result(self, timeout=None):
114
"""
115
Block and wait for the query result.
116
117
Parameters:
118
- timeout (float): Maximum time to wait in seconds
119
120
Returns:
121
ResultSet: Query results
122
123
Raises:
124
- Timeout: If timeout is exceeded
125
- Various query-specific exceptions
126
"""
127
128
def get_query_trace(self, max_wait=2.0):
129
"""
130
Get query trace information if tracing was enabled.
131
132
Parameters:
133
- max_wait (float): Maximum time to wait for trace
134
135
Returns:
136
QueryTrace: Trace information or None
137
"""
138
139
def add_callback(self, fn, *args, **kwargs):
140
"""
141
Add success callback to be executed when query completes successfully.
142
143
Parameters:
144
- fn (callable): Callback function
145
- args: Additional arguments for callback
146
- kwargs: Additional keyword arguments for callback
147
148
The callback will be called with: fn(result, *args, **kwargs)
149
"""
150
151
def add_errback(self, fn, *args, **kwargs):
152
"""
153
Add error callback to be executed when query fails.
154
155
Parameters:
156
- fn (callable): Error callback function
157
- args: Additional arguments for callback
158
- kwargs: Additional keyword arguments for callback
159
160
The errback will be called with: fn(exception, *args, **kwargs)
161
"""
162
163
def add_callbacks(self, callback, errback, callback_args=(), callback_kwargs=None, errback_args=(), errback_kwargs=None):
164
"""
165
Add both success and error callbacks.
166
167
Parameters:
168
- callback (callable): Success callback
169
- errback (callable): Error callback
170
- callback_args (tuple): Arguments for success callback
171
- callback_kwargs (dict): Keyword arguments for success callback
172
- errback_args (tuple): Arguments for error callback
173
- errback_kwargs (dict): Keyword arguments for error callback
174
"""
175
176
@property
177
def query(self):
178
"""str or Statement: The query that was executed"""
179
180
@property
181
def session(self):
182
"""Session: The session used for execution"""
183
184
@property
185
def coordinator_host(self):
186
"""Host: The coordinator host for this query"""
187
188
@property
189
def has_more_pages(self):
190
"""bool: Whether there are more pages of results"""
191
192
@property
193
def warnings(self):
194
"""list: List of warning messages from the server"""
195
196
@property
197
def custom_payload(self):
198
"""dict: Custom payload returned by the server"""
199
200
@property
201
def is_schema_agreed(self):
202
"""bool: Whether schema agreement was reached"""
203
```
204
205
### Connection Pool Events
206
207
Event-driven connection pool management for monitoring and debugging.
208
209
```python { .api }
210
class HostConnectionPool:
211
"""
212
Connection pool for a specific host with async capabilities.
213
"""
214
215
def get_connections(self):
216
"""
217
Get all connections in the pool.
218
219
Returns:
220
set: Set of active connections
221
"""
222
223
def return_connection(self, connection):
224
"""
225
Return a connection to the pool.
226
227
Parameters:
228
- connection: Connection object to return
229
"""
230
231
def shutdown(self):
232
"""
233
Shutdown the connection pool and close all connections.
234
"""
235
236
@property
237
def host(self):
238
"""Host: The host this pool connects to"""
239
240
@property
241
def is_shutdown(self):
242
"""bool: Whether the pool has been shut down"""
243
244
@property
245
def open_count(self):
246
"""int: Number of open connections"""
247
```
248
249
## Usage Examples
250
251
### Basic Asynchronous Operations
252
253
```python
254
from cassandra.cluster import Cluster
255
from cassandra.query import SimpleStatement
256
import uuid
257
258
cluster = Cluster()
259
session = cluster.connect('keyspace1')
260
261
# Execute query asynchronously
262
future = session.execute_async("SELECT * FROM users")
263
264
# Option 1: Block and wait for result
265
result = future.result(timeout=10.0)
266
for row in result:
267
print(f"User: {row.name}")
268
269
# Option 2: Use callbacks
270
def handle_success(result):
271
print(f"Query returned {len(result)} rows")
272
for row in result:
273
print(f"User: {row.name}")
274
275
def handle_error(exception):
276
print(f"Query failed: {exception}")
277
278
future = session.execute_async("SELECT * FROM users")
279
future.add_callback(handle_success)
280
future.add_errback(handle_error)
281
282
# Continue with other work while query executes in background
283
print("Query executing in background...")
284
```
285
286
### Using Different I/O Reactors
287
288
```python
289
from cassandra.cluster import Cluster
290
from cassandra.io.geventreactor import GeventConnection
291
from cassandra.io.eventletreactor import EventletConnection
292
from cassandra.io.twistedreactor import TwistedConnection
293
from cassandra.io.libevreactor import LibevConnection
294
295
# Gevent reactor (requires: pip install gevent)
296
cluster_gevent = Cluster(
297
contact_points=['127.0.0.1'],
298
connection_class=GeventConnection
299
)
300
301
# Eventlet reactor (requires: pip install eventlet)
302
cluster_eventlet = Cluster(
303
contact_points=['127.0.0.1'],
304
connection_class=EventletConnection
305
)
306
307
# Twisted reactor (requires: pip install twisted)
308
cluster_twisted = Cluster(
309
contact_points=['127.0.0.1'],
310
connection_class=TwistedConnection
311
)
312
313
# Libev reactor (requires libev system library)
314
cluster_libev = Cluster(
315
contact_points=['127.0.0.1'],
316
connection_class=LibevConnection
317
)
318
319
# Use the appropriate reactor for your application
320
session = cluster_gevent.connect()
321
```
322
323
### Concurrent Query Execution
324
325
```python
326
from cassandra.concurrent import execute_concurrent, execute_concurrent_with_args
327
from cassandra.query import SimpleStatement
328
import uuid
329
from datetime import datetime
330
331
# Example 1: Execute different statements concurrently
332
statements_and_params = [
333
(SimpleStatement("INSERT INTO users (id, name, email) VALUES (?, ?, ?)"),
334
[uuid.uuid4(), 'Alice', 'alice@example.com']),
335
(SimpleStatement("INSERT INTO users (id, name, email) VALUES (?, ?, ?)"),
336
[uuid.uuid4(), 'Bob', 'bob@example.com']),
337
(SimpleStatement("INSERT INTO posts (id, author, title) VALUES (?, ?, ?)"),
338
[uuid.uuid4(), 'Alice', 'My First Post']),
339
(SimpleStatement("UPDATE counters SET count = count + 1 WHERE id = ?"),
340
['total_users'])
341
]
342
343
# Execute all statements concurrently
344
results = execute_concurrent(session, statements_and_params, concurrency=50)
345
346
# Check results
347
for i, result in enumerate(results):
348
if result is None:
349
print(f"Statement {i} failed")
350
else:
351
print(f"Statement {i} succeeded")
352
353
# Example 2: Execute same statement with different parameters
354
insert_statement = SimpleStatement("INSERT INTO users (id, name, email) VALUES (?, ?, ?)")
355
user_data = [
356
[uuid.uuid4(), 'User1', 'user1@example.com'],
357
[uuid.uuid4(), 'User2', 'user2@example.com'],
358
[uuid.uuid4(), 'User3', 'user3@example.com'],
359
# ... many more users
360
]
361
362
# Execute all inserts concurrently
363
results = execute_concurrent_with_args(
364
session,
365
insert_statement,
366
user_data,
367
concurrency=100
368
)
369
370
successful_inserts = sum(1 for result in results if result is not None)
371
print(f"Successfully inserted {successful_inserts} users")
372
```
373
374
### Advanced Callback Patterns
375
376
```python
377
import threading
378
from collections import defaultdict
379
380
class AsyncQueryManager:
381
def __init__(self, session):
382
self.session = session
383
self.results = defaultdict(list)
384
self.errors = defaultdict(list)
385
self.lock = threading.Lock()
386
self.completed_count = 0
387
self.total_queries = 0
388
389
def execute_queries(self, query_groups):
390
"""Execute multiple groups of queries with organized results."""
391
392
self.total_queries = sum(len(queries) for queries in query_groups.values())
393
394
for group_name, queries in query_groups.items():
395
for query, params in queries:
396
future = self.session.execute_async(query, params)
397
future.add_callback(self._handle_success, group_name)
398
future.add_errback(self._handle_error, group_name)
399
400
def _handle_success(self, result, group_name):
401
with self.lock:
402
self.results[group_name].append(result)
403
self.completed_count += 1
404
self._check_completion()
405
406
def _handle_error(self, error, group_name):
407
with self.lock:
408
self.errors[group_name].append(error)
409
self.completed_count += 1
410
self._check_completion()
411
412
def _check_completion(self):
413
if self.completed_count >= self.total_queries:
414
print("All queries completed!")
415
self._print_summary()
416
417
def _print_summary(self):
418
for group_name in self.results:
419
success_count = len(self.results[group_name])
420
error_count = len(self.errors[group_name])
421
print(f"{group_name}: {success_count} successful, {error_count} failed")
422
423
# Usage
424
manager = AsyncQueryManager(session)
425
426
query_groups = {
427
'user_inserts': [
428
("INSERT INTO users (id, name) VALUES (?, ?)", [uuid.uuid4(), f'User{i}'])
429
for i in range(100)
430
],
431
'post_inserts': [
432
("INSERT INTO posts (id, title) VALUES (?, ?)", [uuid.uuid4(), f'Post{i}'])
433
for i in range(50)
434
],
435
'analytics_queries': [
436
("SELECT COUNT(*) FROM users", []),
437
("SELECT COUNT(*) FROM posts", []),
438
("SELECT * FROM recent_activity LIMIT 10", [])
439
]
440
}
441
442
manager.execute_queries(query_groups)
443
```
444
445
### Asynchronous Pagination
446
447
```python
448
class AsyncPaginator:
449
def __init__(self, session, query, page_size=1000):
450
self.session = session
451
self.query = query
452
self.page_size = page_size
453
self.callbacks = []
454
self.error_callbacks = []
455
456
def add_page_callback(self, callback):
457
"""Add callback to be called for each page of results."""
458
self.callbacks.append(callback)
459
460
def add_error_callback(self, callback):
461
"""Add callback to be called on errors."""
462
self.error_callbacks.append(callback)
463
464
def start(self):
465
"""Start paginating through results."""
466
statement = SimpleStatement(self.query, fetch_size=self.page_size)
467
future = self.session.execute_async(statement)
468
future.add_callback(self._handle_page)
469
future.add_errback(self._handle_error)
470
471
def _handle_page(self, result):
472
"""Handle a page of results."""
473
# Process current page
474
for callback in self.callbacks:
475
callback(result.current_rows)
476
477
# Check if there are more pages
478
if result.has_more_pages:
479
# Fetch next page asynchronously
480
statement = SimpleStatement(self.query, fetch_size=self.page_size)
481
statement.paging_state = result.paging_state
482
future = self.session.execute_async(statement)
483
future.add_callback(self._handle_page)
484
future.add_errback(self._handle_error)
485
else:
486
print("Pagination complete")
487
488
def _handle_error(self, error):
489
"""Handle pagination errors."""
490
for callback in self.error_callbacks:
491
callback(error)
492
493
# Usage
494
def process_page(rows):
495
print(f"Processing page with {len(rows)} rows")
496
for row in rows:
497
# Process each row
498
pass
499
500
def handle_error(error):
501
print(f"Pagination error: {error}")
502
503
paginator = AsyncPaginator(session, "SELECT * FROM large_table")
504
paginator.add_page_callback(process_page)
505
paginator.add_error_callback(handle_error)
506
paginator.start()
507
508
# Continue with other work while pagination happens in background
509
```
510
511
### Asynchronous Batch Processing
512
513
```python
514
import asyncio
515
from concurrent.futures import ThreadPoolExecutor
516
517
class AsyncBatchProcessor:
518
def __init__(self, session, batch_size=1000, concurrency=10):
519
self.session = session
520
self.batch_size = batch_size
521
self.concurrency = concurrency
522
self.executor = ThreadPoolExecutor(max_workers=concurrency)
523
524
def process_records(self, records, process_func):
525
"""Process records in batches asynchronously."""
526
527
# Split records into batches
528
batches = [
529
records[i:i + self.batch_size]
530
for i in range(0, len(records), self.batch_size)
531
]
532
533
print(f"Processing {len(records)} records in {len(batches)} batches")
534
535
# Process batches concurrently
536
futures = []
537
for batch in batches:
538
future = self.executor.submit(self._process_batch, batch, process_func)
539
futures.append(future)
540
541
# Wait for all batches to complete
542
results = []
543
for future in futures:
544
try:
545
result = future.result(timeout=60)
546
results.append(result)
547
except Exception as e:
548
print(f"Batch processing error: {e}")
549
results.append(None)
550
551
return results
552
553
def _process_batch(self, batch, process_func):
554
"""Process a single batch of records."""
555
statements_and_params = []
556
557
for record in batch:
558
query, params = process_func(record)
559
statements_and_params.append((query, params))
560
561
# Execute batch concurrently
562
results = execute_concurrent(
563
self.session,
564
statements_and_params,
565
concurrency=self.concurrency
566
)
567
568
return results
569
570
# Usage
571
def create_insert_statement(user_data):
572
"""Convert user data to insert statement."""
573
query = "INSERT INTO users (id, name, email, created_at) VALUES (?, ?, ?, ?)"
574
params = [
575
user_data['id'],
576
user_data['name'],
577
user_data['email'],
578
datetime.utcnow()
579
]
580
return query, params
581
582
# Process large dataset
583
user_records = [
584
{'id': uuid.uuid4(), 'name': f'User{i}', 'email': f'user{i}@example.com'}
585
for i in range(10000)
586
]
587
588
processor = AsyncBatchProcessor(session, batch_size=500, concurrency=20)
589
results = processor.process_records(user_records, create_insert_statement)
590
591
# Analyze results
592
total_batches = len(results)
593
successful_batches = sum(1 for r in results if r is not None)
594
print(f"Processed {total_batches} batches, {successful_batches} successful")
595
```
596
597
### Connection Pool Monitoring
598
599
```python
600
from cassandra.pool import Host
601
602
class ConnectionPoolMonitor:
603
def __init__(self, cluster):
604
self.cluster = cluster
605
606
def get_pool_stats(self):
607
"""Get connection pool statistics for all hosts."""
608
stats = {}
609
610
if hasattr(self.cluster, 'metadata') and self.cluster.metadata:
611
for host in self.cluster.metadata.all_hosts():
612
pool = self.cluster._connection_pools.get(host)
613
if pool:
614
stats[host.address] = {
615
'host_state': 'UP' if host.is_up else 'DOWN',
616
'datacenter': host.datacenter,
617
'rack': host.rack,
618
'open_connections': pool.open_count,
619
'is_pool_shutdown': pool.is_shutdown,
620
'pool_size': len(pool.get_connections()) if not pool.is_shutdown else 0
621
}
622
623
return stats
624
625
def print_pool_summary(self):
626
"""Print a summary of connection pool status."""
627
stats = self.get_pool_stats()
628
629
print("Connection Pool Summary:")
630
print("-" * 60)
631
632
for host_address, host_stats in stats.items():
633
print(f"Host: {host_address}")
634
print(f" State: {host_stats['host_state']}")
635
print(f" DC/Rack: {host_stats['datacenter']}/{host_stats['rack']}")
636
print(f" Open Connections: {host_stats['open_connections']}")
637
print(f" Pool Size: {host_stats['pool_size']}")
638
print()
639
640
# Usage
641
monitor = ConnectionPoolMonitor(cluster)
642
643
# Monitor pool stats periodically
644
import time
645
import threading
646
647
def monitor_pools():
648
while True:
649
monitor.print_pool_summary()
650
time.sleep(30) # Check every 30 seconds
651
652
# Start monitoring in background thread
653
monitor_thread = threading.Thread(target=monitor_pools, daemon=True)
654
monitor_thread.start()
655
656
# Your application continues running
657
session = cluster.connect()
658
# ... perform queries ...
659
```
660
661
### Asynchronous Query Timeout Handling
662
663
```python
664
from cassandra import OperationTimedOut
665
import time
666
667
class TimeoutHandler:
668
def __init__(self, session, default_timeout=10.0):
669
self.session = session
670
self.default_timeout = default_timeout
671
self.timeout_stats = {
672
'total_queries': 0,
673
'timeouts': 0,
674
'retries': 0,
675
'failures': 0
676
}
677
678
def execute_with_retry(self, query, params=None, max_retries=3, timeout=None):
679
"""Execute query with timeout handling and retries."""
680
681
timeout = timeout or self.default_timeout
682
683
for attempt in range(max_retries + 1):
684
try:
685
self.timeout_stats['total_queries'] += 1
686
687
future = self.session.execute_async(query, params)
688
result = future.result(timeout=timeout)
689
690
return result
691
692
except OperationTimedOut as e:
693
self.timeout_stats['timeouts'] += 1
694
695
if attempt < max_retries:
696
self.timeout_stats['retries'] += 1
697
print(f"Query timed out, retrying (attempt {attempt + 1}/{max_retries})")
698
699
# Exponential backoff
700
time.sleep(2 ** attempt)
701
continue
702
else:
703
self.timeout_stats['failures'] += 1
704
print(f"Query failed after {max_retries} retries")
705
raise
706
707
except Exception as e:
708
self.timeout_stats['failures'] += 1
709
print(f"Query failed with non-timeout error: {e}")
710
raise
711
712
def get_timeout_stats(self):
713
"""Get timeout statistics."""
714
stats = self.timeout_stats.copy()
715
if stats['total_queries'] > 0:
716
stats['timeout_rate'] = stats['timeouts'] / stats['total_queries']
717
stats['success_rate'] = (stats['total_queries'] - stats['failures']) / stats['total_queries']
718
return stats
719
720
# Usage
721
timeout_handler = TimeoutHandler(session, default_timeout=5.0)
722
723
# Execute queries with timeout handling
724
try:
725
result = timeout_handler.execute_with_retry(
726
"SELECT * FROM slow_table WHERE complex_condition = ?",
727
params=['some_value'],
728
max_retries=2,
729
timeout=15.0
730
)
731
print(f"Query succeeded with {len(result)} results")
732
733
except OperationTimedOut:
734
print("Query timed out after all retries")
735
except Exception as e:
736
print(f"Query failed: {e}")
737
738
# Check timeout statistics
739
stats = timeout_handler.get_timeout_stats()
740
print(f"Timeout statistics: {stats}")
741
```