0
# Utilities
1
2
Development and testing utilities including data generation tools, external data support, configuration management, and helper functions for enhanced developer experience with ClickHouse Connect.
3
4
## Capabilities
5
6
### Data Generation Tools
7
8
Utilities for generating random test data matching ClickHouse column types for development, testing, and benchmarking purposes.
9
10
```python { .api }
11
from clickhouse_connect.tools.datagen import RandomValueDef, random_col_data, random_value_gen
12
13
RandomValueDef = NamedTuple('RandomValueDef', [
14
('name', str),
15
('ch_type', str),
16
('nullable', bool),
17
('low_card', bool)
18
])
19
"""
20
Definition for random value generation parameters.
21
22
Fields:
23
- name: Column name
24
- ch_type: ClickHouse type string
25
- nullable: Whether to generate NULL values
26
- low_card: Use low cardinality for string types
27
"""
28
29
def random_col_data(
30
ch_type: str,
31
size: int,
32
nullable: bool = False
33
) -> list:
34
"""
35
Generate random data for a ClickHouse column type.
36
37
Parameters:
38
- ch_type: ClickHouse type string (e.g., 'Int32', 'String', 'DateTime')
39
- size: Number of values to generate
40
- nullable: Include NULL values in generated data
41
42
Returns:
43
List of random values matching the specified type
44
45
Supported types:
46
- Integer types: Int8, Int16, Int32, Int64, UInt8, UInt16, UInt32, UInt64
47
- Float types: Float32, Float64, Decimal
48
- String types: String, FixedString, LowCardinality(String)
49
- Date/time types: Date, DateTime, DateTime64
50
- Boolean: Bool
51
- Array types: Array(T) for supported inner types
52
- Complex types: Tuple, Map, Nested (basic support)
53
54
Example:
55
data = random_col_data('Int32', 1000, nullable=True)
56
# Returns list of 1000 random integers with some NULL values
57
"""
58
59
def random_value_gen(
60
ch_type: str,
61
nullable: bool = False
62
) -> Generator:
63
"""
64
Create generator for random values of specified ClickHouse type.
65
66
Parameters:
67
- ch_type: ClickHouse type string
68
- nullable: Include NULL values in generation
69
70
Returns:
71
Generator yielding random values of the specified type
72
73
Example:
74
gen = random_value_gen('Float64', nullable=True)
75
values = [next(gen) for _ in range(100)]
76
"""
77
78
# Additional data generation functions
79
def random_float() -> float: ...
80
def random_float32() -> float: ...
81
def random_decimal(prec: int, scale: int) -> Decimal: ...
82
def random_datetime() -> datetime: ...
83
def random_datetime_tz(timezone: tzinfo) -> datetime: ...
84
def random_datetime64(prec: int) -> datetime: ...
85
def random_ascii_str(max_len: int = 200, min_len: int = 0) -> str: ...
86
def random_utf8_str(max_len: int = 200) -> str: ...
87
def fixed_len_ascii_str(str_len: int = 200) -> str: ...
88
def random_ipv6() -> str: ...
89
def random_tuple(element_types: Sequence[ClickHouseType], col_def) -> tuple: ...
90
def random_map(key_type, value_type, sz: int, col_def) -> dict: ...
91
def random_nested(keys: Sequence[str], types: Sequence[ClickHouseType], col_def: RandomValueDef) -> dict: ...
92
```
93
94
### Testing Utilities
95
96
Testing framework utilities including table context managers and test data management for comprehensive testing scenarios.
97
98
```python { .api }
99
from clickhouse_connect.tools.testing import TableContext
100
101
class TableContext:
102
"""
103
Context manager for creating and cleaning up test tables.
104
105
Provides automatic table lifecycle management for testing,
106
ensuring tables are properly created and cleaned up even
107
if tests fail or are interrupted.
108
"""
109
110
def __init__(
111
self,
112
client,
113
table_name: str,
114
columns: list[tuple[str, str]],
115
engine: str = 'Memory',
116
database: str = '',
117
cleanup: bool = True
118
):
119
"""
120
Initialize table context for testing.
121
122
Parameters:
123
- client: ClickHouse client instance
124
- table_name: Name of test table to create
125
- columns: List of (column_name, column_type) tuples
126
- engine: ClickHouse table engine (default: Memory for testing)
127
- database: Target database (uses client default if empty)
128
- cleanup: Whether to drop table on context exit
129
130
Example:
131
with TableContext(client, 'test_users', [
132
('id', 'UInt32'),
133
('name', 'String'),
134
('created', 'DateTime')
135
]) as table:
136
# Use table for testing
137
client.insert(table.full_name, test_data)
138
result = client.query(f'SELECT * FROM {table.full_name}')
139
# Table automatically dropped here
140
"""
141
142
def __enter__(self) -> 'TableContext':
143
"""Create table and return context."""
144
145
def __exit__(self, exc_type, exc_val, exc_tb):
146
"""Clean up table if cleanup enabled."""
147
148
@property
149
def full_name(self) -> str:
150
"""Get fully qualified table name (database.table)."""
151
152
def insert_data(self, data: list, column_names: list[str] = None):
153
"""Insert test data into the table."""
154
155
def query(self, query: str = None) -> QueryResult:
156
"""Query the test table."""
157
```
158
159
### External Data Support
160
161
Support for external data sources and temporary tables for complex query scenarios and data integration workflows.
162
163
```python { .api }
164
from clickhouse_connect.driver.external import ExternalData, ExternalFile
165
166
class ExternalFile:
167
"""
168
External file definition for ClickHouse queries.
169
170
Allows referencing external data files in queries,
171
enabling complex data processing scenarios without
172
permanently storing data in ClickHouse.
173
"""
174
175
def __init__(
176
self,
177
name: str,
178
content: bytes | BinaryIO,
179
fmt: str = 'TSV',
180
types: list[str] = None,
181
structure: str = ''
182
):
183
"""
184
Initialize external file definition.
185
186
Parameters:
187
- name: Logical name for the external file in queries
188
- content: File content as bytes or binary stream
189
- fmt: Data format (TSV, CSV, JSON, etc.)
190
- types: List of ClickHouse type names for columns
191
- structure: Column structure string (alternative to types)
192
193
Example:
194
external_file = ExternalFile(
195
name='lookup_data',
196
content=csv_data.encode('utf-8'),
197
fmt='CSV',
198
types=['String', 'Int32', 'Float64']
199
)
200
"""
201
202
class ExternalData:
203
"""
204
Container for external data sources in queries.
205
206
Manages collection of external files and temporary
207
tables that can be referenced in ClickHouse queries.
208
"""
209
210
def __init__(self):
211
"""Initialize empty external data container."""
212
213
def add_file(
214
self,
215
name: str,
216
content: bytes | BinaryIO,
217
fmt: str = 'TSV',
218
types: list[str] = None,
219
structure: str = ''
220
):
221
"""
222
Add external file to the container.
223
224
Parameters match ExternalFile constructor.
225
"""
226
227
def add_table(
228
self,
229
name: str,
230
data: list[list],
231
column_names: list[str],
232
column_types: list[str]
233
):
234
"""
235
Add external table data to the container.
236
237
Parameters:
238
- name: Logical table name for queries
239
- data: Table data as list of rows
240
- column_names: Column names
241
- column_types: ClickHouse type names for columns
242
"""
243
244
def clear(self):
245
"""Remove all external data sources."""
246
247
@property
248
def files(self) -> dict[str, ExternalFile]:
249
"""Get dictionary of external files by name."""
250
```
251
252
### Common Configuration
253
254
Centralized configuration management and common settings for ClickHouse Connect applications.
255
256
```python { .api }
257
from clickhouse_connect.common import (
258
version,
259
build_client_name,
260
get_setting,
261
set_setting,
262
CommonSetting
263
)
264
265
def version() -> str:
266
"""
267
Get ClickHouse Connect package version.
268
269
Returns:
270
Version string (e.g., '0.8.18')
271
"""
272
273
def build_client_name(client_name: str = '') -> str:
274
"""
275
Build User-Agent string for HTTP requests.
276
277
Parameters:
278
- client_name: Custom client identifier to prepend
279
280
Returns:
281
Complete User-Agent string with client name and version info
282
283
Example:
284
user_agent = build_client_name('MyApp/1.0')
285
# Returns: 'MyApp/1.0 clickhouse-connect/0.8.18 (Python/3.9.0)'
286
"""
287
288
def get_setting(key: str) -> Any:
289
"""
290
Get common setting value.
291
292
Parameters:
293
- key: Setting name
294
295
Returns:
296
Setting value, or default if not set
297
298
Available settings:
299
- autogenerate_session_id: Auto-generate session IDs (bool)
300
- readonly: Global readonly mode (str)
301
- use_protocol_version: Use enhanced protocol features (bool)
302
- max_connection_age: Maximum connection age in seconds (int)
303
"""
304
305
def set_setting(key: str, value: Any):
306
"""
307
Set common setting value.
308
309
Parameters:
310
- key: Setting name
311
- value: Setting value
312
313
Example:
314
set_setting('autogenerate_session_id', True)
315
set_setting('max_connection_age', 3600)
316
"""
317
318
class CommonSetting:
319
"""
320
Common setting definition with metadata.
321
322
Provides setting definition, validation, and
323
default value management for global settings.
324
"""
325
326
def __init__(
327
self,
328
name: str,
329
default_value: Any,
330
description: str = '',
331
validator: callable = None
332
):
333
"""
334
Initialize common setting definition.
335
336
Parameters:
337
- name: Setting name
338
- default_value: Default value for the setting
339
- description: Human-readable description
340
- validator: Optional validation function
341
"""
342
```
343
344
### Helper Functions and Utilities
345
346
Miscellaneous utility functions for data conversion, identifier handling, and common operations.
347
348
```python { .api }
349
from clickhouse_connect.driver.binding import quote_identifier
350
from clickhouse_connect.driver.common import dict_copy, coerce_int, coerce_bool
351
from clickhouse_connect.driver.tzutil import normalize_timezone
352
353
def quote_identifier(identifier: str) -> str:
354
"""
355
Quote ClickHouse identifier if needed.
356
357
Parameters:
358
- identifier: Table name, column name, or other identifier
359
360
Returns:
361
Properly quoted identifier for safe use in SQL
362
363
Example:
364
safe_name = quote_identifier('user-data') # Returns: `user-data`
365
safe_name = quote_identifier('normal_name') # Returns: normal_name
366
"""
367
368
def dict_copy(source: dict, **kwargs) -> dict:
369
"""
370
Create dictionary copy with optional updates.
371
372
Parameters:
373
- source: Source dictionary to copy
374
- **kwargs: Additional key-value pairs to include/override
375
376
Returns:
377
New dictionary with copied and updated values
378
"""
379
380
def coerce_int(value: Any) -> int:
381
"""
382
Safely convert value to integer.
383
384
Parameters:
385
- value: Value to convert
386
387
Returns:
388
Integer value, or 0 if conversion fails
389
"""
390
391
def coerce_bool(value: Any) -> bool:
392
"""
393
Safely convert value to boolean.
394
395
Parameters:
396
- value: Value to convert
397
398
Returns:
399
Boolean value using ClickHouse-style conversion rules
400
"""
401
402
def normalize_timezone(tz) -> tuple[tzinfo, bool]:
403
"""
404
Normalize timezone and check DST safety.
405
406
Parameters:
407
- tz: Timezone object or string
408
409
Returns:
410
Tuple of (normalized_timezone, is_dst_safe)
411
"""
412
```
413
414
### Common Settings Management
415
416
Global configuration settings that affect client behavior across all instances and operations.
417
418
```python { .api }
419
from clickhouse_connect.common import get_setting, set_setting, version
420
421
def version() -> str:
422
"""
423
Get ClickHouse Connect package version.
424
425
Returns:
426
Package version string (e.g., '0.8.18')
427
"""
428
429
def get_setting(name: str) -> Any:
430
"""
431
Get global ClickHouse Connect setting value.
432
433
Parameters:
434
- name: Setting name
435
436
Returns:
437
Current setting value or None if not set
438
439
Available settings:
440
- 'autogenerate_session_id': Auto-generate session IDs (default: True)
441
- 'dict_parameter_format': Dict parameter format: 'json'|'map' (default: 'json')
442
- 'invalid_setting_action': Invalid setting action: 'send'|'drop'|'error' (default: 'error')
443
- 'max_connection_age': Max connection reuse time in seconds (default: 600)
444
- 'product_name': Product name for client identification (default: '')
445
- 'send_os_user': Include OS user in client identification (default: True)
446
- 'use_protocol_version': Use client protocol version (default: True)
447
- 'max_error_size': Maximum error message size (default: 1024)
448
- 'http_buffer_size': HTTP streaming buffer size (default: 10MB)
449
"""
450
451
def set_setting(name: str, value: Any) -> None:
452
"""
453
Set global ClickHouse Connect setting.
454
455
Parameters:
456
- name: Setting name (see get_setting for available settings)
457
- value: Setting value
458
459
Note: Changes affect new client instances created after the setting is changed
460
"""
461
462
def build_client_name(client_name: str = '') -> str:
463
"""
464
Build User-Agent string for HTTP requests.
465
466
Parameters:
467
- client_name: Custom client name to prepend
468
469
Returns:
470
Complete User-Agent string with version and system info
471
"""
472
```
473
474
## Usage Examples
475
476
### Random Data Generation
477
478
```python
479
import clickhouse_connect
480
from clickhouse_connect.tools.datagen import random_col_data, random_value_gen
481
482
client = clickhouse_connect.create_client(host='localhost')
483
484
# Generate test data for different column types
485
user_ids = random_col_data('UInt32', 1000)
486
user_names = random_col_data('String', 1000)
487
signup_dates = random_col_data('DateTime', 1000)
488
scores = random_col_data('Float64', 1000, nullable=True)
489
490
# Combine into test dataset
491
test_data = list(zip(user_ids, user_names, signup_dates, scores))
492
493
# Create test table and insert data
494
client.command("""
495
CREATE TABLE test_users (
496
id UInt32,
497
name String,
498
signup_date DateTime,
499
score Nullable(Float64)
500
) ENGINE = Memory
501
""")
502
503
client.insert('test_users', test_data,
504
column_names=['id', 'name', 'signup_date', 'score'])
505
506
print(f"Inserted {len(test_data)} test records")
507
508
# Query test data
509
result = client.query("SELECT count(), avg(score) FROM test_users WHERE score IS NOT NULL")
510
count, avg_score = result.first_row()
511
print(f"Records: {count}, Average score: {avg_score:.2f}")
512
513
# Generate streaming test data
514
def generate_test_batch(batch_id: int, batch_size: int = 1000):
515
"""Generate a batch of test data."""
516
id_gen = random_value_gen('UInt32')
517
name_gen = random_value_gen('String')
518
date_gen = random_value_gen('DateTime')
519
520
batch = []
521
for i in range(batch_size):
522
batch.append([
523
next(id_gen) + batch_id * batch_size, # Ensure unique IDs
524
f"batch_{batch_id}_{next(name_gen)}",
525
next(date_gen),
526
next(random_value_gen('Float64', nullable=True))
527
])
528
return batch
529
530
# Insert multiple batches
531
for batch_id in range(5):
532
batch_data = generate_test_batch(batch_id)
533
client.insert('test_users', batch_data,
534
column_names=['id', 'name', 'signup_date', 'score'])
535
536
print("Inserted 5 batches of test data")
537
538
# Clean up
539
client.command("DROP TABLE test_users")
540
```
541
542
### Table Context Testing
543
544
```python
545
import clickhouse_connect
546
from clickhouse_connect.tools.testing import TableContext
547
548
client = clickhouse_connect.create_client(host='localhost')
549
550
def test_user_operations():
551
"""Test user operations with automatic table cleanup."""
552
553
with TableContext(
554
client,
555
'test_users',
556
[
557
('id', 'UInt32'),
558
('name', 'String'),
559
('email', 'String'),
560
('created_at', 'DateTime')
561
],
562
engine='Memory'
563
) as table:
564
565
# Insert test data
566
test_users = [
567
[1, 'Alice', 'alice@example.com', '2023-01-01 10:00:00'],
568
[2, 'Bob', 'bob@example.com', '2023-01-02 11:00:00'],
569
[3, 'Carol', 'carol@example.com', '2023-01-03 12:00:00']
570
]
571
572
table.insert_data(test_users, ['id', 'name', 'email', 'created_at'])
573
574
# Test queries
575
result = table.query("SELECT count() FROM {}")
576
assert result.first_item() == 3, "Should have 3 users"
577
578
result = table.query("SELECT name FROM {} WHERE id = 2")
579
assert result.first_item() == 'Bob', "User 2 should be Bob"
580
581
# Test with WHERE clause
582
result = client.query(
583
f"SELECT name FROM {table.full_name} WHERE email LIKE '%@example.com'"
584
)
585
assert len(result.result_set) == 3, "All users have example.com email"
586
587
print("All user operation tests passed!")
588
589
# Table automatically cleaned up here
590
591
def test_with_multiple_tables():
592
"""Test operations across multiple tables."""
593
594
with TableContext(client, 'users', [('id', 'UInt32'), ('name', 'String')]) as users_table, \
595
TableContext(client, 'orders', [('user_id', 'UInt32'), ('amount', 'Float64')]) as orders_table:
596
597
# Insert related data
598
users_table.insert_data([[1, 'Alice'], [2, 'Bob']], ['id', 'name'])
599
orders_table.insert_data([[1, 99.99], [1, 149.99], [2, 79.99]], ['user_id', 'amount'])
600
601
# Test join query
602
result = client.query(f"""
603
SELECT u.name, sum(o.amount) as total
604
FROM {users_table.full_name} u
605
JOIN {orders_table.full_name} o ON u.id = o.user_id
606
GROUP BY u.name
607
ORDER BY total DESC
608
""")
609
610
assert len(result.result_set) == 2, "Should have 2 users with orders"
611
assert result.result_set[0][1] == 249.98, "Alice should have highest total"
612
613
print("Multi-table test passed!")
614
615
# Both tables automatically cleaned up
616
617
# Run tests
618
test_user_operations()
619
test_with_multiple_tables()
620
```
621
622
### External Data Integration
623
624
```python
625
import clickhouse_connect
626
from clickhouse_connect.driver.external import ExternalData
627
import csv
628
import io
629
630
client = clickhouse_connect.create_client(host='localhost')
631
632
def process_with_external_data():
633
"""Process data using external files in queries."""
634
635
# Create CSV data for lookup
636
csv_data = """country_code,country_name,region
637
US,United States,North America
638
CA,Canada,North America
639
GB,United Kingdom,Europe
640
DE,Germany,Europe
641
JP,Japan,Asia
642
"""
643
644
# Create external data container
645
external_data = ExternalData()
646
647
# Add CSV file as external data
648
external_data.add_file(
649
name='country_lookup',
650
content=csv_data.encode('utf-8'),
651
fmt='CSV',
652
types=['String', 'String', 'String']
653
)
654
655
# Add programmatic data as external table
656
currency_data = [
657
['US', 'USD', 1.0],
658
['CA', 'CAD', 0.75],
659
['GB', 'GBP', 1.25],
660
['DE', 'EUR', 1.10],
661
['JP', 'JPY', 0.007]
662
]
663
664
external_data.add_table(
665
name='exchange_rates',
666
data=currency_data,
667
column_names=['country_code', 'currency', 'rate'],
668
column_types=['String', 'String', 'Float64']
669
)
670
671
# Query combining main data with external data
672
result = client.query("""
673
SELECT
674
c.country_name,
675
c.region,
676
e.currency,
677
e.rate,
678
1000 * e.rate as local_amount
679
FROM country_lookup c
680
JOIN exchange_rates e ON c.country_code = e.country_code
681
ORDER BY c.region, c.country_name
682
""", external_data=external_data)
683
684
print("Country data with exchange rates:")
685
for row in result.result_set:
686
country, region, currency, rate, local_amount = row
687
print(f"{country} ({region}): {currency} - Rate: {rate}, 1000 USD = {local_amount:.2f} {currency}")
688
689
def process_large_external_file():
690
"""Process large external file efficiently."""
691
692
# Generate large CSV data
693
csv_buffer = io.StringIO()
694
writer = csv.writer(csv_buffer)
695
writer.writerow(['id', 'category', 'value'])
696
697
for i in range(10000):
698
writer.writerow([i, f'category_{i % 10}', i * 1.5])
699
700
csv_content = csv_buffer.getvalue().encode('utf-8')
701
702
external_data = ExternalData()
703
external_data.add_file(
704
name='large_dataset',
705
content=csv_content,
706
fmt='CSV',
707
types=['UInt32', 'String', 'Float64']
708
)
709
710
# Process external data with aggregation
711
result = client.query("""
712
SELECT
713
category,
714
count() as record_count,
715
avg(value) as avg_value,
716
sum(value) as total_value
717
FROM large_dataset
718
GROUP BY category
719
ORDER BY category
720
""", external_data=external_data)
721
722
print("\nLarge dataset aggregation:")
723
for category, count, avg_val, total_val in result.result_set:
724
print(f"{category}: {count} records, avg: {avg_val:.2f}, total: {total_val:.2f}")
725
726
# Run examples
727
process_with_external_data()
728
process_large_external_file()
729
```
730
731
### Configuration Management
732
733
```python
734
import clickhouse_connect
735
from clickhouse_connect.common import (
736
get_setting, set_setting, version, build_client_name
737
)
738
739
# Display package information
740
print(f"ClickHouse Connect version: {version()}")
741
print(f"Default User-Agent: {build_client_name()}")
742
print(f"Custom User-Agent: {build_client_name('MyApp/2.0')}")
743
744
# Configure global settings
745
print("\nConfiguring global settings:")
746
set_setting('autogenerate_session_id', True)
747
set_setting('max_connection_age', 7200) # 2 hours
748
749
print(f"Auto-generate session ID: {get_setting('autogenerate_session_id')}")
750
print(f"Max connection age: {get_setting('max_connection_age')} seconds")
751
752
# Create client with global settings applied
753
client = clickhouse_connect.create_client(
754
host='localhost',
755
client_name='MyApp/2.0'
756
)
757
758
# Check if session ID was auto-generated
759
session_info = client.command("SELECT sessionId()")
760
print(f"Session ID: {session_info}")
761
762
# Test connection health
763
if client.ping():
764
print("Connection is healthy")
765
766
# Get server info
767
server_version = client.command("SELECT version()")
768
server_uptime = client.command("SELECT uptime()")
769
770
print(f"Server version: {server_version}")
771
print(f"Server uptime: {server_uptime} seconds")
772
773
client.close()
774
```
775
776
### Utility Functions
777
778
```python
779
import clickhouse_connect
780
from clickhouse_connect.driver.binding import quote_identifier
781
from clickhouse_connect.driver.common import dict_copy, coerce_int, coerce_bool
782
783
client = clickhouse_connect.create_client(host='localhost')
784
785
# Safe identifier quoting
786
table_names = ['users', 'user-data', 'order_items', 'special@table']
787
for name in table_names:
788
quoted = quote_identifier(name)
789
print(f"'{name}' -> {quoted}")
790
791
# Build dynamic query with safe identifiers
792
def build_select_query(table_name: str, columns: list[str], where_clause: str = ''):
793
"""Build SELECT query with safe identifier quoting."""
794
795
safe_table = quote_identifier(table_name)
796
safe_columns = [quote_identifier(col) for col in columns]
797
798
query = f"SELECT {', '.join(safe_columns)} FROM {safe_table}"
799
if where_clause:
800
query += f" WHERE {where_clause}"
801
802
return query
803
804
# Example usage
805
query = build_select_query(
806
'user-data',
807
['user-id', 'full-name', 'created-at'],
808
'status = 1'
809
)
810
print(f"\nGenerated query: {query}")
811
812
# Configuration merging
813
base_config = {
814
'host': 'localhost',
815
'port': 8123,
816
'compress': 'lz4'
817
}
818
819
# Create variations with dict_copy
820
dev_config = dict_copy(base_config, database='development', port=8124)
821
prod_config = dict_copy(base_config,
822
host='prod.clickhouse.com',
823
database='production',
824
secure=True)
825
826
print(f"\nBase config: {base_config}")
827
print(f"Dev config: {dev_config}")
828
print(f"Prod config: {prod_config}")
829
830
# Type coercion utilities
831
test_values = ['123', '0', 'true', 'false', '1', '', None, 42]
832
833
print("\nType coercion examples:")
834
for value in test_values:
835
int_val = coerce_int(value)
836
bool_val = coerce_bool(value)
837
print(f"{repr(value):>8} -> int: {int_val:>3}, bool: {bool_val}")
838
839
client.close()
840
```