0
# Query Execution
1
2
Statement types, query execution, batch operations, and result handling with comprehensive parameter binding and tracing support. The query execution system provides both simple string queries and prepared statements for optimal performance.
3
4
## Capabilities
5
6
### Statement Types
7
8
Different statement types for various query execution patterns and performance requirements.
9
10
```python { .api }
11
class Statement:
12
def __init__(self):
13
"""Base class for all statement types."""
14
15
@property
16
def consistency_level(self):
17
"""int: Consistency level for this statement"""
18
19
@consistency_level.setter
20
def consistency_level(self, consistency_level):
21
"""Set the consistency level"""
22
23
@property
24
def serial_consistency_level(self):
25
"""int: Serial consistency level for conditional statements"""
26
27
@serial_consistency_level.setter
28
def serial_consistency_level(self, serial_consistency_level):
29
"""Set the serial consistency level"""
30
31
@property
32
def retry_policy(self):
33
"""RetryPolicy: Retry policy for this statement"""
34
35
@retry_policy.setter
36
def retry_policy(self, retry_policy):
37
"""Set the retry policy"""
38
39
@property
40
def timeout(self):
41
"""float: Timeout for this statement in seconds"""
42
43
@timeout.setter
44
def timeout(self, timeout):
45
"""Set the timeout"""
46
47
@property
48
def trace(self):
49
"""bool: Whether tracing is enabled for this statement"""
50
51
@trace.setter
52
def trace(self, trace):
53
"""Enable or disable tracing"""
54
55
@property
56
def fetch_size(self):
57
"""int: Number of rows to fetch per page"""
58
59
@fetch_size.setter
60
def fetch_size(self, fetch_size):
61
"""Set the fetch size for paging"""
62
63
@property
64
def paging_state(self):
65
"""bytes: Paging state for continuing paged queries"""
66
67
@paging_state.setter
68
def paging_state(self, paging_state):
69
"""Set the paging state"""
70
71
class SimpleStatement(Statement):
72
def __init__(self, query_string, consistency_level=None, serial_consistency_level=None, fetch_size=None):
73
"""
74
A simple CQL query string statement.
75
76
Parameters:
77
- query_string (str): CQL query with optional parameter placeholders
78
- consistency_level (int): Consistency level for this query
79
- serial_consistency_level (int): Serial consistency for conditional queries
80
- fetch_size (int): Number of rows to fetch per page
81
"""
82
83
@property
84
def query_string(self):
85
"""str: The CQL query string"""
86
87
class PreparedStatement(Statement):
88
def __init__(self, column_metadata, query_id, routing_key_indexes, query_string, keyspace, protocol_version, session):
89
"""
90
A prepared statement with server-side query compilation.
91
92
Note: PreparedStatement objects are created by Session.prepare(), not directly instantiated.
93
"""
94
95
def bind(self, values):
96
"""
97
Bind parameter values to create a BoundStatement.
98
99
Parameters:
100
- values (list, tuple, or dict): Parameter values to bind
101
102
Returns:
103
BoundStatement: Statement with bound parameters ready for execution
104
"""
105
106
@property
107
def query_string(self):
108
"""str: The original CQL query string"""
109
110
@property
111
def column_metadata(self):
112
"""list: Metadata for query parameters and result columns"""
113
114
@property
115
def query_id(self):
116
"""bytes: Server-assigned query identifier"""
117
118
@property
119
def routing_key_indexes(self):
120
"""list: Indexes of parameters used for routing"""
121
122
class BoundStatement(Statement):
123
def __init__(self, prepared_statement, values=None):
124
"""
125
A prepared statement with bound parameter values.
126
127
Parameters:
128
- prepared_statement (PreparedStatement): The prepared statement to bind
129
- values (list, tuple, or dict): Parameter values
130
"""
131
132
@property
133
def prepared_statement(self):
134
"""PreparedStatement: The underlying prepared statement"""
135
136
@property
137
def values(self):
138
"""list: Bound parameter values"""
139
140
@property
141
def routing_key(self):
142
"""bytes: Routing key for token-aware load balancing"""
143
```
144
145
### Batch Operations
146
147
Batch multiple statements together for atomic execution or performance optimization.
148
149
```python { .api }
150
class BatchType:
151
"""Constants defining batch operation types."""
152
153
LOGGED = 0
154
"""Atomic batch with transaction log (default)"""
155
156
UNLOGGED = 1
157
"""Non-atomic batch for performance"""
158
159
COUNTER = 2
160
"""Batch for counter operations only"""
161
162
class BatchStatement(Statement):
163
def __init__(self, batch_type=BatchType.LOGGED, consistency_level=None, serial_consistency_level=None):
164
"""
165
A batch of multiple statements executed atomically.
166
167
Parameters:
168
- batch_type (int): Type of batch operation (LOGGED, UNLOGGED, or COUNTER)
169
- consistency_level (int): Consistency level for the batch
170
- serial_consistency_level (int): Serial consistency for conditional statements
171
"""
172
173
def add(self, statement, parameters=None):
174
"""
175
Add a statement to the batch.
176
177
Parameters:
178
- statement (str, SimpleStatement, PreparedStatement, or BoundStatement): Statement to add
179
- parameters (list, tuple, or dict): Parameters if statement is a string
180
"""
181
182
def add_all(self, statements_and_parameters):
183
"""
184
Add multiple statements to the batch.
185
186
Parameters:
187
- statements_and_parameters (iterable): Sequence of (statement, parameters) tuples
188
"""
189
190
def clear(self):
191
"""Remove all statements from the batch."""
192
193
@property
194
def size(self):
195
"""int: Number of statements in the batch"""
196
197
@property
198
def batch_type(self):
199
"""int: The batch type (LOGGED, UNLOGGED, or COUNTER)"""
200
```
201
202
### Row Factory Functions
203
204
Functions for customizing how query result rows are represented.
205
206
```python { .api }
207
def tuple_factory(colnames, rows):
208
"""
209
Row factory that returns rows as tuples.
210
211
Parameters:
212
- colnames (list): Column names from the result set
213
- rows (list): Raw row data from the database
214
215
Returns:
216
list: List of tuples representing rows
217
"""
218
219
def named_tuple_factory(colnames, rows):
220
"""
221
Row factory that returns rows as named tuples (default).
222
223
Parameters:
224
- colnames (list): Column names from the result set
225
- rows (list): Raw row data from the database
226
227
Returns:
228
list: List of named tuples with column names as attributes
229
"""
230
231
def dict_factory(colnames, rows):
232
"""
233
Row factory that returns rows as dictionaries.
234
235
Parameters:
236
- colnames (list): Column names from the result set
237
- rows (list): Raw row data from the database
238
239
Returns:
240
list: List of dictionaries with column names as keys
241
"""
242
243
def ordered_dict_factory(colnames, rows):
244
"""
245
Row factory that returns rows as OrderedDict objects.
246
247
Parameters:
248
- colnames (list): Column names from the result set
249
- rows (list): Raw row data from the database
250
251
Returns:
252
list: List of OrderedDict objects preserving column order
253
"""
254
```
255
256
### Query Tracing
257
258
Query execution tracing for performance analysis and debugging.
259
260
```python { .api }
261
class QueryTrace:
262
def __init__(self, trace_id, session):
263
"""
264
Trace information for an executed query.
265
266
Parameters:
267
- trace_id (UUID): Unique identifier for this trace
268
- session (Session): Session used to fetch trace details
269
"""
270
271
@property
272
def trace_id(self):
273
"""UUID: Unique identifier for this trace"""
274
275
@property
276
def request_type(self):
277
"""str: Type of request that was traced"""
278
279
@property
280
def duration(self):
281
"""int: Total duration of the query in microseconds"""
282
283
@property
284
def coordinator(self):
285
"""str: Address of the coordinator node"""
286
287
@property
288
def parameters(self):
289
"""dict: Query parameters"""
290
291
@property
292
def started_at(self):
293
"""datetime: When the query started executing"""
294
295
@property
296
def events(self):
297
"""list: List of TraceEvent objects showing execution steps"""
298
299
class TraceEvent:
300
def __init__(self, description, datetime, source, source_elapsed):
301
"""
302
Individual event in a query trace.
303
304
Parameters:
305
- description (str): Description of this trace event
306
- datetime (datetime): When this event occurred
307
- source (str): Address of the node where this event occurred
308
- source_elapsed (int): Elapsed microseconds since query start
309
"""
310
311
@property
312
def description(self):
313
"""str: Description of this trace event"""
314
315
@property
316
def datetime(self):
317
"""datetime: When this event occurred"""
318
319
@property
320
def source(self):
321
"""str: Address of the node where this event occurred"""
322
323
@property
324
def source_elapsed(self):
325
"""int: Elapsed microseconds since query start"""
326
327
class TraceUnavailable(Exception):
328
"""Exception raised when trace information is not available."""
329
pass
330
```
331
332
### Parameter Binding
333
334
Utilities for binding parameters to query strings.
335
336
```python { .api }
337
def bind_params(query, params, encoder):
338
"""
339
Bind parameters to a query string.
340
341
Parameters:
342
- query (str): CQL query string with parameter placeholders
343
- params (list, tuple, or dict): Parameter values to bind
344
- encoder (Encoder): Encoder instance for parameter serialization
345
346
Returns:
347
str: Query string with parameters substituted
348
349
Note: This is primarily used internally by the driver.
350
"""
351
```
352
353
## Usage Examples
354
355
### Simple Statements
356
357
```python
358
from cassandra.query import SimpleStatement
359
from cassandra import ConsistencyLevel
360
361
# Create a simple statement
362
statement = SimpleStatement(
363
"SELECT * FROM users WHERE age > %s",
364
consistency_level=ConsistencyLevel.ONE
365
)
366
367
# Execute with parameters
368
result = session.execute(statement, [25])
369
for row in result:
370
print(f"User: {row.name}, Age: {row.age}")
371
372
# Statement with fetch size for paging
373
paged_statement = SimpleStatement(
374
"SELECT * FROM large_table",
375
fetch_size=1000
376
)
377
378
result = session.execute(paged_statement)
379
for row in result:
380
# Results are automatically paged
381
print(row)
382
```
383
384
### Prepared Statements
385
386
```python
387
# Prepare a statement for repeated execution
388
insert_user = session.prepare("""
389
INSERT INTO users (id, name, email, age, created_at)
390
VALUES (?, ?, ?, ?, ?)
391
""")
392
393
# Bind and execute multiple times
394
import uuid
395
from datetime import datetime
396
397
for i in range(100):
398
bound_stmt = insert_user.bind([
399
uuid.uuid4(),
400
f'User {i}',
401
f'user{i}@example.com',
402
20 + (i % 50),
403
datetime.now()
404
])
405
session.execute(bound_stmt)
406
407
# Or execute directly with parameters
408
session.execute(insert_user, [
409
uuid.uuid4(),
410
'Alice Smith',
411
'alice@example.com',
412
30,
413
datetime.now()
414
])
415
```
416
417
### Batch Operations
418
419
```python
420
from cassandra.query import BatchStatement, BatchType
421
422
# Create a logged batch (atomic)
423
batch = BatchStatement(batch_type=BatchType.LOGGED)
424
425
# Add multiple insert statements
426
batch.add(SimpleStatement("INSERT INTO users (id, name) VALUES (%s, %s)"), [uuid.uuid4(), 'Alice'])
427
batch.add(SimpleStatement("INSERT INTO users (id, name) VALUES (%s, %s)"), [uuid.uuid4(), 'Bob'])
428
batch.add(SimpleStatement("INSERT INTO users (id, name) VALUES (%s, %s)"), [uuid.uuid4(), 'Charlie'])
429
430
# Execute the batch atomically
431
session.execute(batch)
432
433
# Counter batch for counter updates
434
counter_batch = BatchStatement(batch_type=BatchType.COUNTER)
435
counter_batch.add(SimpleStatement("UPDATE counters SET count = count + 1 WHERE id = %s"), [1])
436
counter_batch.add(SimpleStatement("UPDATE counters SET count = count + 5 WHERE id = %s"), [2])
437
session.execute(counter_batch)
438
439
# Unlogged batch for performance (not atomic)
440
unlogged_batch = BatchStatement(batch_type=BatchType.UNLOGGED)
441
for i in range(10):
442
unlogged_batch.add(
443
SimpleStatement("INSERT INTO logs (id, message) VALUES (%s, %s)"),
444
[uuid.uuid4(), f'Log message {i}']
445
)
446
session.execute(unlogged_batch)
447
```
448
449
### Custom Row Factories
450
451
```python
452
from cassandra.query import dict_factory, ordered_dict_factory
453
454
# Use dictionary row factory
455
session.row_factory = dict_factory
456
result = session.execute("SELECT id, name, age FROM users LIMIT 5")
457
for row in result:
458
print(f"User {row['name']} (ID: {row['id']}) is {row['age']} years old")
459
460
# Use ordered dictionary factory to preserve column order
461
session.row_factory = ordered_dict_factory
462
result = session.execute("SELECT * FROM users LIMIT 5")
463
for row in result:
464
print("Column order:", list(row.keys()))
465
print("Values:", list(row.values()))
466
467
# Custom row factory
468
def custom_row_factory(colnames, rows):
469
"""Convert rows to custom objects"""
470
class UserRow:
471
def __init__(self, **kwargs):
472
for key, value in kwargs.items():
473
setattr(self, key, value)
474
475
def __str__(self):
476
return f"User({self.name}, {self.age})"
477
478
return [UserRow(**dict(zip(colnames, row))) for row in rows]
479
480
session.row_factory = custom_row_factory
481
result = session.execute("SELECT name, age FROM users LIMIT 3")
482
for user in result:
483
print(user) # Uses custom __str__ method
484
```
485
486
### Query Tracing
487
488
```python
489
# Enable tracing for a specific query
490
statement = SimpleStatement("SELECT * FROM users WHERE id = %s")
491
statement.trace = True
492
493
result = session.execute(statement, [user_id])
494
495
# Get the trace information
496
trace = result.get_query_trace(max_wait=5.0)
497
print(f"Query duration: {trace.duration} microseconds")
498
print(f"Coordinator: {trace.coordinator}")
499
print(f"Request type: {trace.request_type}")
500
501
# Print all trace events
502
for event in trace.events:
503
print(f"[{event.source_elapsed:>6}μs] {event.source}: {event.description}")
504
505
# Enable tracing for all queries in a session
506
session.default_trace = True
507
result = session.execute("SELECT COUNT(*) FROM users")
508
trace = result.get_query_trace()
509
print(f"Count query took {trace.duration} microseconds")
510
```
511
512
### Paging Through Large Results
513
514
```python
515
# Automatic paging with fetch_size
516
statement = SimpleStatement("SELECT * FROM large_table", fetch_size=1000)
517
result = session.execute(statement)
518
519
# Iterate through all rows (automatic paging)
520
row_count = 0
521
for row in result:
522
row_count += 1
523
if row_count % 1000 == 0:
524
print(f"Processed {row_count} rows...")
525
526
# Manual paging control
527
statement = SimpleStatement("SELECT * FROM large_table", fetch_size=100)
528
result = session.execute(statement)
529
530
while True:
531
# Process current page
532
for row in result.current_rows:
533
process_row(row)
534
535
# Check if more pages available
536
if result.has_more_pages:
537
# Fetch next page
538
result = session.execute(statement, paging_state=result.paging_state)
539
else:
540
break
541
```
542
543
### Statement Configuration
544
545
```python
546
from cassandra import ConsistencyLevel
547
from cassandra.policies import FallthroughRetryPolicy
548
549
# Configure statement properties
550
statement = SimpleStatement("SELECT * FROM users WHERE region = %s")
551
statement.consistency_level = ConsistencyLevel.LOCAL_QUORUM
552
statement.serial_consistency_level = ConsistencyLevel.LOCAL_SERIAL
553
statement.timeout = 30.0
554
statement.retry_policy = FallthroughRetryPolicy()
555
statement.fetch_size = 5000
556
557
# Execute configured statement
558
result = session.execute(statement, ['US-East'])
559
560
# Same configuration for prepared statements
561
prepared = session.prepare("UPDATE users SET email = ? WHERE id = ?")
562
prepared.consistency_level = ConsistencyLevel.QUORUM
563
prepared.timeout = 10.0
564
565
bound = prepared.bind(['new@example.com', user_id])
566
session.execute(bound)
567
```