or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

async-io.mdauth-policies.mdcluster-session.mdcql-types.mdcqlengine-orm.mdindex.mdmetadata.mdquery-execution.md

query-execution.mddocs/

0

# Query Execution

1

2

Statement types, query execution, batch operations, and result handling with comprehensive parameter binding and tracing support. The query execution system provides both simple string queries and prepared statements for optimal performance.

3

4

## Capabilities

5

6

### Statement Types

7

8

Different statement types for various query execution patterns and performance requirements.

9

10

```python { .api }

11

class Statement:

12

def __init__(self):

13

"""Base class for all statement types."""

14

15

@property

16

def consistency_level(self):

17

"""int: Consistency level for this statement"""

18

19

@consistency_level.setter

20

def consistency_level(self, consistency_level):

21

"""Set the consistency level"""

22

23

@property

24

def serial_consistency_level(self):

25

"""int: Serial consistency level for conditional statements"""

26

27

@serial_consistency_level.setter

28

def serial_consistency_level(self, serial_consistency_level):

29

"""Set the serial consistency level"""

30

31

@property

32

def retry_policy(self):

33

"""RetryPolicy: Retry policy for this statement"""

34

35

@retry_policy.setter

36

def retry_policy(self, retry_policy):

37

"""Set the retry policy"""

38

39

@property

40

def timeout(self):

41

"""float: Timeout for this statement in seconds"""

42

43

@timeout.setter

44

def timeout(self, timeout):

45

"""Set the timeout"""

46

47

@property

48

def trace(self):

49

"""bool: Whether tracing is enabled for this statement"""

50

51

@trace.setter

52

def trace(self, trace):

53

"""Enable or disable tracing"""

54

55

@property

56

def fetch_size(self):

57

"""int: Number of rows to fetch per page"""

58

59

@fetch_size.setter

60

def fetch_size(self, fetch_size):

61

"""Set the fetch size for paging"""

62

63

@property

64

def paging_state(self):

65

"""bytes: Paging state for continuing paged queries"""

66

67

@paging_state.setter

68

def paging_state(self, paging_state):

69

"""Set the paging state"""

70

71

class SimpleStatement(Statement):

72

def __init__(self, query_string, consistency_level=None, serial_consistency_level=None, fetch_size=None):

73

"""

74

A simple CQL query string statement.

75

76

Parameters:

77

- query_string (str): CQL query with optional parameter placeholders

78

- consistency_level (int): Consistency level for this query

79

- serial_consistency_level (int): Serial consistency for conditional queries

80

- fetch_size (int): Number of rows to fetch per page

81

"""

82

83

@property

84

def query_string(self):

85

"""str: The CQL query string"""

86

87

class PreparedStatement(Statement):

88

def __init__(self, column_metadata, query_id, routing_key_indexes, query_string, keyspace, protocol_version, session):

89

"""

90

A prepared statement with server-side query compilation.

91

92

Note: PreparedStatement objects are created by Session.prepare(), not directly instantiated.

93

"""

94

95

def bind(self, values):

96

"""

97

Bind parameter values to create a BoundStatement.

98

99

Parameters:

100

- values (list, tuple, or dict): Parameter values to bind

101

102

Returns:

103

BoundStatement: Statement with bound parameters ready for execution

104

"""

105

106

@property

107

def query_string(self):

108

"""str: The original CQL query string"""

109

110

@property

111

def column_metadata(self):

112

"""list: Metadata for query parameters and result columns"""

113

114

@property

115

def query_id(self):

116

"""bytes: Server-assigned query identifier"""

117

118

@property

119

def routing_key_indexes(self):

120

"""list: Indexes of parameters used for routing"""

121

122

class BoundStatement(Statement):

123

def __init__(self, prepared_statement, values=None):

124

"""

125

A prepared statement with bound parameter values.

126

127

Parameters:

128

- prepared_statement (PreparedStatement): The prepared statement to bind

129

- values (list, tuple, or dict): Parameter values

130

"""

131

132

@property

133

def prepared_statement(self):

134

"""PreparedStatement: The underlying prepared statement"""

135

136

@property

137

def values(self):

138

"""list: Bound parameter values"""

139

140

@property

141

def routing_key(self):

142

"""bytes: Routing key for token-aware load balancing"""

143

```

144

145

### Batch Operations

146

147

Batch multiple statements together for atomic execution or performance optimization.

148

149

```python { .api }

150

class BatchType:

151

"""Constants defining batch operation types."""

152

153

LOGGED = 0

154

"""Atomic batch with transaction log (default)"""

155

156

UNLOGGED = 1

157

"""Non-atomic batch for performance"""

158

159

COUNTER = 2

160

"""Batch for counter operations only"""

161

162

class BatchStatement(Statement):

163

def __init__(self, batch_type=BatchType.LOGGED, consistency_level=None, serial_consistency_level=None):

164

"""

165

A batch of multiple statements executed atomically.

166

167

Parameters:

168

- batch_type (int): Type of batch operation (LOGGED, UNLOGGED, or COUNTER)

169

- consistency_level (int): Consistency level for the batch

170

- serial_consistency_level (int): Serial consistency for conditional statements

171

"""

172

173

def add(self, statement, parameters=None):

174

"""

175

Add a statement to the batch.

176

177

Parameters:

178

- statement (str, SimpleStatement, PreparedStatement, or BoundStatement): Statement to add

179

- parameters (list, tuple, or dict): Parameters if statement is a string

180

"""

181

182

def add_all(self, statements_and_parameters):

183

"""

184

Add multiple statements to the batch.

185

186

Parameters:

187

- statements_and_parameters (iterable): Sequence of (statement, parameters) tuples

188

"""

189

190

def clear(self):

191

"""Remove all statements from the batch."""

192

193

@property

194

def size(self):

195

"""int: Number of statements in the batch"""

196

197

@property

198

def batch_type(self):

199

"""int: The batch type (LOGGED, UNLOGGED, or COUNTER)"""

200

```

201

202

### Row Factory Functions

203

204

Functions for customizing how query result rows are represented.

205

206

```python { .api }

207

def tuple_factory(colnames, rows):

208

"""

209

Row factory that returns rows as tuples.

210

211

Parameters:

212

- colnames (list): Column names from the result set

213

- rows (list): Raw row data from the database

214

215

Returns:

216

list: List of tuples representing rows

217

"""

218

219

def named_tuple_factory(colnames, rows):

220

"""

221

Row factory that returns rows as named tuples (default).

222

223

Parameters:

224

- colnames (list): Column names from the result set

225

- rows (list): Raw row data from the database

226

227

Returns:

228

list: List of named tuples with column names as attributes

229

"""

230

231

def dict_factory(colnames, rows):

232

"""

233

Row factory that returns rows as dictionaries.

234

235

Parameters:

236

- colnames (list): Column names from the result set

237

- rows (list): Raw row data from the database

238

239

Returns:

240

list: List of dictionaries with column names as keys

241

"""

242

243

def ordered_dict_factory(colnames, rows):

244

"""

245

Row factory that returns rows as OrderedDict objects.

246

247

Parameters:

248

- colnames (list): Column names from the result set

249

- rows (list): Raw row data from the database

250

251

Returns:

252

list: List of OrderedDict objects preserving column order

253

"""

254

```

255

256

### Query Tracing

257

258

Query execution tracing for performance analysis and debugging.

259

260

```python { .api }

261

class QueryTrace:

262

def __init__(self, trace_id, session):

263

"""

264

Trace information for an executed query.

265

266

Parameters:

267

- trace_id (UUID): Unique identifier for this trace

268

- session (Session): Session used to fetch trace details

269

"""

270

271

@property

272

def trace_id(self):

273

"""UUID: Unique identifier for this trace"""

274

275

@property

276

def request_type(self):

277

"""str: Type of request that was traced"""

278

279

@property

280

def duration(self):

281

"""int: Total duration of the query in microseconds"""

282

283

@property

284

def coordinator(self):

285

"""str: Address of the coordinator node"""

286

287

@property

288

def parameters(self):

289

"""dict: Query parameters"""

290

291

@property

292

def started_at(self):

293

"""datetime: When the query started executing"""

294

295

@property

296

def events(self):

297

"""list: List of TraceEvent objects showing execution steps"""

298

299

class TraceEvent:

300

def __init__(self, description, datetime, source, source_elapsed):

301

"""

302

Individual event in a query trace.

303

304

Parameters:

305

- description (str): Description of this trace event

306

- datetime (datetime): When this event occurred

307

- source (str): Address of the node where this event occurred

308

- source_elapsed (int): Elapsed microseconds since query start

309

"""

310

311

@property

312

def description(self):

313

"""str: Description of this trace event"""

314

315

@property

316

def datetime(self):

317

"""datetime: When this event occurred"""

318

319

@property

320

def source(self):

321

"""str: Address of the node where this event occurred"""

322

323

@property

324

def source_elapsed(self):

325

"""int: Elapsed microseconds since query start"""

326

327

class TraceUnavailable(Exception):

328

"""Exception raised when trace information is not available."""

329

pass

330

```

331

332

### Parameter Binding

333

334

Utilities for binding parameters to query strings.

335

336

```python { .api }

337

def bind_params(query, params, encoder):

338

"""

339

Bind parameters to a query string.

340

341

Parameters:

342

- query (str): CQL query string with parameter placeholders

343

- params (list, tuple, or dict): Parameter values to bind

344

- encoder (Encoder): Encoder instance for parameter serialization

345

346

Returns:

347

str: Query string with parameters substituted

348

349

Note: This is primarily used internally by the driver.

350

"""

351

```

352

353

## Usage Examples

354

355

### Simple Statements

356

357

```python

358

from cassandra.query import SimpleStatement

359

from cassandra import ConsistencyLevel

360

361

# Create a simple statement

362

statement = SimpleStatement(

363

"SELECT * FROM users WHERE age > %s",

364

consistency_level=ConsistencyLevel.ONE

365

)

366

367

# Execute with parameters

368

result = session.execute(statement, [25])

369

for row in result:

370

print(f"User: {row.name}, Age: {row.age}")

371

372

# Statement with fetch size for paging

373

paged_statement = SimpleStatement(

374

"SELECT * FROM large_table",

375

fetch_size=1000

376

)

377

378

result = session.execute(paged_statement)

379

for row in result:

380

# Results are automatically paged

381

print(row)

382

```

383

384

### Prepared Statements

385

386

```python

387

# Prepare a statement for repeated execution

388

insert_user = session.prepare("""

389

INSERT INTO users (id, name, email, age, created_at)

390

VALUES (?, ?, ?, ?, ?)

391

""")

392

393

# Bind and execute multiple times

394

import uuid

395

from datetime import datetime

396

397

for i in range(100):

398

bound_stmt = insert_user.bind([

399

uuid.uuid4(),

400

f'User {i}',

401

f'user{i}@example.com',

402

20 + (i % 50),

403

datetime.now()

404

])

405

session.execute(bound_stmt)

406

407

# Or execute directly with parameters

408

session.execute(insert_user, [

409

uuid.uuid4(),

410

'Alice Smith',

411

'alice@example.com',

412

30,

413

datetime.now()

414

])

415

```

416

417

### Batch Operations

418

419

```python

420

from cassandra.query import BatchStatement, BatchType

421

422

# Create a logged batch (atomic)

423

batch = BatchStatement(batch_type=BatchType.LOGGED)

424

425

# Add multiple insert statements

426

batch.add(SimpleStatement("INSERT INTO users (id, name) VALUES (%s, %s)"), [uuid.uuid4(), 'Alice'])

427

batch.add(SimpleStatement("INSERT INTO users (id, name) VALUES (%s, %s)"), [uuid.uuid4(), 'Bob'])

428

batch.add(SimpleStatement("INSERT INTO users (id, name) VALUES (%s, %s)"), [uuid.uuid4(), 'Charlie'])

429

430

# Execute the batch atomically

431

session.execute(batch)

432

433

# Counter batch for counter updates

434

counter_batch = BatchStatement(batch_type=BatchType.COUNTER)

435

counter_batch.add(SimpleStatement("UPDATE counters SET count = count + 1 WHERE id = %s"), [1])

436

counter_batch.add(SimpleStatement("UPDATE counters SET count = count + 5 WHERE id = %s"), [2])

437

session.execute(counter_batch)

438

439

# Unlogged batch for performance (not atomic)

440

unlogged_batch = BatchStatement(batch_type=BatchType.UNLOGGED)

441

for i in range(10):

442

unlogged_batch.add(

443

SimpleStatement("INSERT INTO logs (id, message) VALUES (%s, %s)"),

444

[uuid.uuid4(), f'Log message {i}']

445

)

446

session.execute(unlogged_batch)

447

```

448

449

### Custom Row Factories

450

451

```python

452

from cassandra.query import dict_factory, ordered_dict_factory

453

454

# Use dictionary row factory

455

session.row_factory = dict_factory

456

result = session.execute("SELECT id, name, age FROM users LIMIT 5")

457

for row in result:

458

print(f"User {row['name']} (ID: {row['id']}) is {row['age']} years old")

459

460

# Use ordered dictionary factory to preserve column order

461

session.row_factory = ordered_dict_factory

462

result = session.execute("SELECT * FROM users LIMIT 5")

463

for row in result:

464

print("Column order:", list(row.keys()))

465

print("Values:", list(row.values()))

466

467

# Custom row factory

468

def custom_row_factory(colnames, rows):

469

"""Convert rows to custom objects"""

470

class UserRow:

471

def __init__(self, **kwargs):

472

for key, value in kwargs.items():

473

setattr(self, key, value)

474

475

def __str__(self):

476

return f"User({self.name}, {self.age})"

477

478

return [UserRow(**dict(zip(colnames, row))) for row in rows]

479

480

session.row_factory = custom_row_factory

481

result = session.execute("SELECT name, age FROM users LIMIT 3")

482

for user in result:

483

print(user) # Uses custom __str__ method

484

```

485

486

### Query Tracing

487

488

```python

489

# Enable tracing for a specific query

490

statement = SimpleStatement("SELECT * FROM users WHERE id = %s")

491

statement.trace = True

492

493

result = session.execute(statement, [user_id])

494

495

# Get the trace information

496

trace = result.get_query_trace(max_wait=5.0)

497

print(f"Query duration: {trace.duration} microseconds")

498

print(f"Coordinator: {trace.coordinator}")

499

print(f"Request type: {trace.request_type}")

500

501

# Print all trace events

502

for event in trace.events:

503

print(f"[{event.source_elapsed:>6}μs] {event.source}: {event.description}")

504

505

# Enable tracing for all queries in a session

506

session.default_trace = True

507

result = session.execute("SELECT COUNT(*) FROM users")

508

trace = result.get_query_trace()

509

print(f"Count query took {trace.duration} microseconds")

510

```

511

512

### Paging Through Large Results

513

514

```python

515

# Automatic paging with fetch_size

516

statement = SimpleStatement("SELECT * FROM large_table", fetch_size=1000)

517

result = session.execute(statement)

518

519

# Iterate through all rows (automatic paging)

520

row_count = 0

521

for row in result:

522

row_count += 1

523

if row_count % 1000 == 0:

524

print(f"Processed {row_count} rows...")

525

526

# Manual paging control

527

statement = SimpleStatement("SELECT * FROM large_table", fetch_size=100)

528

result = session.execute(statement)

529

530

while True:

531

# Process current page

532

for row in result.current_rows:

533

process_row(row)

534

535

# Check if more pages available

536

if result.has_more_pages:

537

# Fetch next page

538

result = session.execute(statement, paging_state=result.paging_state)

539

else:

540

break

541

```

542

543

### Statement Configuration

544

545

```python

546

from cassandra import ConsistencyLevel

547

from cassandra.policies import FallthroughRetryPolicy

548

549

# Configure statement properties

550

statement = SimpleStatement("SELECT * FROM users WHERE region = %s")

551

statement.consistency_level = ConsistencyLevel.LOCAL_QUORUM

552

statement.serial_consistency_level = ConsistencyLevel.LOCAL_SERIAL

553

statement.timeout = 30.0

554

statement.retry_policy = FallthroughRetryPolicy()

555

statement.fetch_size = 5000

556

557

# Execute configured statement

558

result = session.execute(statement, ['US-East'])

559

560

# Same configuration for prepared statements

561

prepared = session.prepare("UPDATE users SET email = ? WHERE id = ?")

562

prepared.consistency_level = ConsistencyLevel.QUORUM

563

prepared.timeout = 10.0

564

565

bound = prepared.bind(['new@example.com', user_id])

566

session.execute(bound)

567

```