0
# Results Processing
1
2
Multiple result formats and processing modes for efficient handling of query results including standard tuples, streaming iterators, progress tracking, and optional NumPy arrays. The driver provides flexible result processing to optimize memory usage and performance for different workload patterns.
3
4
## Capabilities
5
6
### Standard Query Results
7
8
Basic query result storage and access with metadata information.
9
10
```python { .api }
11
class QueryResult:
12
"""
13
Standard query result storage.
14
15
Contains complete query results loaded into memory with
16
optional column metadata information.
17
"""
18
19
def __init__(self, data, columns_with_types=None):
20
"""
21
Initialize query result.
22
23
Parameters:
24
- data: List of result row tuples
25
- columns_with_types: Optional column metadata
26
"""
27
28
@property
29
def data(self):
30
"""
31
Query result data.
32
33
Returns:
34
- List[Tuple]: Result rows as tuples
35
"""
36
37
@property
38
def columns_with_types(self):
39
"""
40
Column metadata information.
41
42
Returns:
43
- List[Tuple]: Column info as (name, type) tuples
44
"""
45
```
46
47
### Streaming Query Results
48
49
Memory-efficient streaming results for processing large datasets without loading everything into memory.
50
51
```python { .api }
52
class IterQueryResult:
53
"""
54
Streaming query result iterator.
55
56
Yields result blocks incrementally for memory-efficient
57
processing of large query results.
58
"""
59
60
def __iter__(self):
61
"""
62
Iterator protocol support.
63
64
Returns:
65
- Iterator: Self as iterator
66
"""
67
68
def __next__(self):
69
"""
70
Get next result block.
71
72
Returns:
73
- List[Tuple]: Next block of result rows
74
75
Raises:
76
- StopIteration: When no more blocks available
77
"""
78
79
@property
80
def columns_with_types(self):
81
"""
82
Column metadata (available after first block).
83
84
Returns:
85
- List[Tuple]: Column info as (name, type) tuples
86
"""
87
```
88
89
### Progress Tracking Results
90
91
Query results with execution progress information for monitoring long-running operations.
92
93
```python { .api }
94
class ProgressQueryResult:
95
"""
96
Query result with progress information.
97
98
Generator yielding (progress, data) tuples during query execution
99
for monitoring progress of long-running operations.
100
"""
101
102
def __iter__(self):
103
"""
104
Iterator protocol yielding progress and data.
105
106
Yields:
107
- Tuple[Progress, List[Tuple]]: (progress_info, result_block)
108
"""
109
110
@property
111
def columns_with_types(self):
112
"""
113
Column metadata information.
114
115
Returns:
116
- List[Tuple]: Column info as (name, type) tuples
117
"""
118
```
119
120
### Progress Information
121
122
Detailed progress tracking for query execution monitoring.
123
124
```python { .api }
125
class Progress:
126
"""
127
Query execution progress information.
128
129
Provides detailed metrics about query processing progress
130
including rows processed, bytes processed, and timing information.
131
"""
132
133
def __init__(self, rows=0, bytes=0, total_rows=0, written_rows=0, written_bytes=0):
134
"""
135
Initialize progress information.
136
137
Parameters:
138
- rows: Number of rows processed
139
- bytes: Number of bytes processed
140
- total_rows: Total estimated rows to process
141
- written_rows: Number of rows written (for INSERT)
142
- written_bytes: Number of bytes written (for INSERT)
143
"""
144
145
@property
146
def rows(self):
147
"""Number of rows processed so far."""
148
149
@property
150
def bytes(self):
151
"""Number of bytes processed so far."""
152
153
@property
154
def total_rows(self):
155
"""Total estimated rows to process (0 if unknown)."""
156
157
@property
158
def written_rows(self):
159
"""Number of rows written (for INSERT operations)."""
160
161
@property
162
def written_bytes(self):
163
"""Number of bytes written (for INSERT operations)."""
164
165
def __str__(self):
166
"""Human-readable progress representation."""
167
```
168
169
### Query Execution Information
170
171
Metadata about query execution including statistics and profiling data.
172
173
```python { .api }
174
class QueryInfo:
175
"""
176
Query execution information and statistics.
177
178
Contains metadata about query execution including
179
performance metrics and resource usage.
180
"""
181
182
def __init__(self, query_id=None, elapsed=None, rows_read=None, bytes_read=None):
183
"""
184
Initialize query information.
185
186
Parameters:
187
- query_id: Unique query identifier
188
- elapsed: Query execution time in seconds
189
- rows_read: Total rows read during execution
190
- bytes_read: Total bytes read during execution
191
"""
192
193
@property
194
def query_id(self):
195
"""Unique query identifier string."""
196
197
@property
198
def elapsed(self):
199
"""Query execution time in seconds."""
200
201
@property
202
def rows_read(self):
203
"""Total rows read during query execution."""
204
205
@property
206
def bytes_read(self):
207
"""Total bytes read during query execution."""
208
```
209
210
### Block Stream Profile Information
211
212
Detailed execution profiling for performance analysis and optimization.
213
214
```python { .api }
215
class BlockStreamProfileInfo:
216
"""
217
Execution profiling information.
218
219
Detailed performance metrics for query execution analysis
220
including timing breakdowns and resource utilization.
221
"""
222
223
def __init__(self, rows=0, blocks=0, bytes=0, applied_limit=0,
224
rows_before_limit=0, calculated_rows_before_limit=0):
225
"""
226
Initialize profile information.
227
228
Parameters:
229
- rows: Number of rows in profile
230
- blocks: Number of blocks processed
231
- bytes: Number of bytes processed
232
- applied_limit: Whether LIMIT was applied
233
- rows_before_limit: Rows count before LIMIT application
234
- calculated_rows_before_limit: Calculated rows before LIMIT
235
"""
236
237
@property
238
def rows(self):
239
"""Number of rows in this profile segment."""
240
241
@property
242
def blocks(self):
243
"""Number of blocks processed."""
244
245
@property
246
def bytes(self):
247
"""Number of bytes processed."""
248
249
@property
250
def applied_limit(self):
251
"""Whether LIMIT clause was applied."""
252
253
@property
254
def rows_before_limit(self):
255
"""Row count before LIMIT application."""
256
```
257
258
### NumPy Integration Results
259
260
High-performance NumPy-based results for numerical computing workloads (requires NumPy extras).
261
262
```python { .api }
263
class NumpyQueryResult:
264
"""
265
NumPy-optimized query results.
266
267
Query results stored as NumPy arrays for high-performance
268
numerical computing workloads.
269
270
Requires: pip install clickhouse-driver[numpy]
271
"""
272
273
@property
274
def data(self):
275
"""
276
Query result data as NumPy structured array.
277
278
Returns:
279
- numpy.ndarray: Structured array with typed columns
280
"""
281
282
@property
283
def columns_with_types(self):
284
"""
285
Column metadata information.
286
287
Returns:
288
- List[Tuple]: Column info as (name, type) tuples
289
"""
290
291
class NumpyIterQueryResult:
292
"""
293
NumPy streaming query result iterator.
294
295
Streaming results with NumPy array blocks for memory-efficient
296
processing of large numerical datasets.
297
"""
298
299
def __iter__(self):
300
"""Iterator yielding NumPy array blocks."""
301
302
def __next__(self):
303
"""
304
Get next NumPy result block.
305
306
Returns:
307
- numpy.ndarray: Next block as structured array
308
"""
309
310
class NumpyProgressQueryResult:
311
"""
312
NumPy query result with progress tracking.
313
314
Progress-aware NumPy results for monitoring long-running
315
numerical computations.
316
"""
317
318
def __iter__(self):
319
"""
320
Iterator yielding progress and NumPy data.
321
322
Yields:
323
- Tuple[Progress, numpy.ndarray]: (progress, array_block)
324
"""
325
```
326
327
## Result Processing Examples
328
329
### Basic Result Handling
330
331
```python
332
from clickhouse_driver import Client
333
334
client = Client('localhost')
335
336
# Standard query execution
337
result = client.execute('SELECT name, age FROM users LIMIT 10')
338
for row in result:
339
name, age = row
340
print(f"{name}: {age} years old")
341
342
# Query with column types
343
columns, rows = client.execute(
344
'SELECT name, age, salary FROM employees',
345
with_column_types=True
346
)
347
348
print("Columns:", columns)
349
# Output: [('name', 'String'), ('age', 'UInt8'), ('salary', 'Decimal(10, 2)')]
350
351
for row in rows:
352
name, age, salary = row
353
print(f"{name}, {age}, ${salary}")
354
```
355
356
### Streaming Large Results
357
358
```python
359
# Memory-efficient processing of large datasets
360
query = 'SELECT * FROM large_table WHERE date >= %(start_date)s'
361
params = {'start_date': '2023-01-01'}
362
363
total_processed = 0
364
for block in client.execute_iter(query, params):
365
# Process each block (typically 1000-10000 rows)
366
for row in block:
367
process_row(row)
368
total_processed += 1
369
370
print(f"Processed {total_processed} rows so far...")
371
372
print(f"Total rows processed: {total_processed}")
373
```
374
375
### Progress Monitoring
376
377
```python
378
# Monitor progress of long-running queries
379
query = '''
380
SELECT category, COUNT(*) as count, AVG(price) as avg_price
381
FROM huge_sales_table
382
WHERE date >= '2020-01-01'
383
GROUP BY category
384
'''
385
386
print("Starting large aggregation query...")
387
start_time = time.time()
388
389
for progress, block in client.execute_with_progress(query):
390
if progress:
391
elapsed = time.time() - start_time
392
print(f"Progress: {progress.rows:,} rows processed in {elapsed:.1f}s")
393
394
if progress.total_rows > 0:
395
percent = (progress.rows / progress.total_rows) * 100
396
print(f"Estimated completion: {percent:.1f}%")
397
398
if block:
399
# Process results as they arrive
400
for category, count, avg_price in block:
401
print(f"{category}: {count:,} sales, avg ${avg_price:.2f}")
402
403
print("Query completed!")
404
```
405
406
### Columnar Data Processing
407
408
```python
409
# Process data in columnar format
410
columns, rows = client.execute(
411
'SELECT user_id, purchase_amount, purchase_date FROM purchases',
412
with_column_types=True,
413
columnar=True
414
)
415
416
# columnar=True returns data organized by columns
417
user_ids, amounts, dates = zip(*rows) if rows else ([], [], [])
418
419
# Efficient column-wise processing
420
total_revenue = sum(amounts)
421
unique_users = len(set(user_ids))
422
latest_date = max(dates) if dates else None
423
424
print(f"Revenue: ${total_revenue:,.2f}")
425
print(f"Unique users: {unique_users:,}")
426
print(f"Latest purchase: {latest_date}")
427
```
428
429
### Result Format Comparisons
430
431
```python
432
# Compare different result formats
433
query = 'SELECT id, name, score FROM test_data LIMIT 1000'
434
435
# Standard tuple format (default)
436
result = client.execute(query)
437
print(f"Standard format: {len(result)} rows")
438
print(f"First row: {result[0]}")
439
440
# With column information
441
columns, rows = client.execute(query, with_column_types=True)
442
print(f"With types: {columns}")
443
print(f"Data: {len(rows)} rows")
444
445
# Columnar format
446
columns, data = client.execute(query, with_column_types=True, columnar=True)
447
if data:
448
ids, names, scores = zip(*data)
449
print(f"Columnar: {len(ids)} ids, {len(names)} names, {len(scores)} scores")
450
```
451
452
### External Table Results
453
454
```python
455
# Process results with external tables
456
external_lookup = {
457
'name': 'category_lookup',
458
'structure': [('id', 'UInt32'), ('name', 'String')],
459
'data': [(1, 'Electronics'), (2, 'Clothing'), (3, 'Books')]
460
}
461
462
result = client.execute('''
463
SELECT p.product_name, c.name as category_name, p.price
464
FROM products p
465
JOIN category_lookup c ON p.category_id = c.id
466
WHERE p.price > 100
467
''', external_tables=[external_lookup])
468
469
for product_name, category_name, price in result:
470
print(f"{product_name} ({category_name}): ${price}")
471
```
472
473
### NumPy Integration
474
475
```python
476
# High-performance numerical processing (requires numpy extra)
477
import numpy as np
478
479
# Enable NumPy for numerical queries
480
client = Client('localhost', settings={'use_numpy': True})
481
482
# Query returning numerical data
483
result = client.execute('''
484
SELECT
485
measurement_time,
486
sensor_id,
487
temperature,
488
humidity,
489
pressure
490
FROM sensor_data
491
WHERE measurement_time >= now() - INTERVAL 1 HOUR
492
''')
493
494
# Result is automatically converted to NumPy structured array
495
if hasattr(result, 'data') and isinstance(result.data, np.ndarray):
496
data = result.data
497
498
# Efficient NumPy operations
499
avg_temp = np.mean(data['temperature'])
500
max_humidity = np.max(data['humidity'])
501
temp_std = np.std(data['temperature'])
502
503
print(f"Average temperature: {avg_temp:.2f}°C")
504
print(f"Max humidity: {max_humidity:.1f}%")
505
print(f"Temperature std dev: {temp_std:.2f}°C")
506
507
# Advanced NumPy analysis
508
high_temp_sensors = np.unique(data[data['temperature'] > 30]['sensor_id'])
509
print(f"High temperature sensors: {high_temp_sensors}")
510
```
511
512
### Error Handling with Results
513
514
```python
515
from clickhouse_driver.errors import PartiallyConsumedQueryError
516
517
try:
518
# Start streaming query
519
result_iter = client.execute_iter('SELECT * FROM large_table')
520
521
# Process first few blocks
522
for i, block in enumerate(result_iter):
523
if i >= 2: # Process only first 2 blocks
524
break
525
process_block(block)
526
527
# Try to execute another query before consuming all results
528
client.execute('SELECT 1') # This will raise PartiallyConsumedQueryError
529
530
except PartiallyConsumedQueryError:
531
print("Previous query not fully consumed - finishing iteration")
532
533
# Consume remaining results
534
for block in result_iter:
535
pass # Discard remaining blocks
536
537
# Now can execute new query
538
client.execute('SELECT 1')
539
```
540
541
### Custom Result Processing
542
543
```python
544
def process_query_with_callback(client, query, row_callback, batch_size=1000):
545
"""Process query results with custom callback function."""
546
547
processed_count = 0
548
549
for block in client.execute_iter(query):
550
for row in block:
551
try:
552
row_callback(row)
553
processed_count += 1
554
555
if processed_count % batch_size == 0:
556
print(f"Processed {processed_count:,} rows")
557
558
except Exception as e:
559
print(f"Error processing row {processed_count}: {e}")
560
continue
561
562
return processed_count
563
564
# Usage with custom processing
565
def analyze_sale(row):
566
product_id, sale_amount, customer_id = row
567
# Custom analysis logic
568
if sale_amount > 1000:
569
print(f"Large sale: ${sale_amount} for product {product_id}")
570
571
total = process_query_with_callback(
572
client,
573
'SELECT product_id, amount, customer_id FROM sales',
574
analyze_sale
575
)
576
577
print(f"Analyzed {total:,} sales records")
578
```