or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

client-api.mddata-formats.mddbapi.mdexceptions.mdindex.mdsqlalchemy.mdutilities.md

utilities.mddocs/

0

# Utilities

1

2

Development and testing utilities including data generation tools, external data support, configuration management, and helper functions for enhanced developer experience with ClickHouse Connect.

3

4

## Capabilities

5

6

### Data Generation Tools

7

8

Utilities for generating random test data matching ClickHouse column types for development, testing, and benchmarking purposes.

9

10

```python { .api }

11

from clickhouse_connect.tools.datagen import RandomValueDef, random_col_data, random_value_gen

12

13

RandomValueDef = NamedTuple('RandomValueDef', [

14

('name', str),

15

('ch_type', str),

16

('nullable', bool),

17

('low_card', bool)

18

])

19

"""

20

Definition for random value generation parameters.

21

22

Fields:

23

- name: Column name

24

- ch_type: ClickHouse type string

25

- nullable: Whether to generate NULL values

26

- low_card: Use low cardinality for string types

27

"""

28

29

def random_col_data(

30

ch_type: str,

31

size: int,

32

nullable: bool = False

33

) -> list:

34

"""

35

Generate random data for a ClickHouse column type.

36

37

Parameters:

38

- ch_type: ClickHouse type string (e.g., 'Int32', 'String', 'DateTime')

39

- size: Number of values to generate

40

- nullable: Include NULL values in generated data

41

42

Returns:

43

List of random values matching the specified type

44

45

Supported types:

46

- Integer types: Int8, Int16, Int32, Int64, UInt8, UInt16, UInt32, UInt64

47

- Float types: Float32, Float64, Decimal

48

- String types: String, FixedString, LowCardinality(String)

49

- Date/time types: Date, DateTime, DateTime64

50

- Boolean: Bool

51

- Array types: Array(T) for supported inner types

52

- Complex types: Tuple, Map, Nested (basic support)

53

54

Example:

55

data = random_col_data('Int32', 1000, nullable=True)

56

# Returns list of 1000 random integers with some NULL values

57

"""

58

59

def random_value_gen(

60

ch_type: str,

61

nullable: bool = False

62

) -> Generator:

63

"""

64

Create generator for random values of specified ClickHouse type.

65

66

Parameters:

67

- ch_type: ClickHouse type string

68

- nullable: Include NULL values in generation

69

70

Returns:

71

Generator yielding random values of the specified type

72

73

Example:

74

gen = random_value_gen('Float64', nullable=True)

75

values = [next(gen) for _ in range(100)]

76

"""

77

78

# Additional data generation functions

79

def random_float() -> float: ...

80

def random_float32() -> float: ...

81

def random_decimal(prec: int, scale: int) -> Decimal: ...

82

def random_datetime() -> datetime: ...

83

def random_datetime_tz(timezone: tzinfo) -> datetime: ...

84

def random_datetime64(prec: int) -> datetime: ...

85

def random_ascii_str(max_len: int = 200, min_len: int = 0) -> str: ...

86

def random_utf8_str(max_len: int = 200) -> str: ...

87

def fixed_len_ascii_str(str_len: int = 200) -> str: ...

88

def random_ipv6() -> str: ...

89

def random_tuple(element_types: Sequence[ClickHouseType], col_def) -> tuple: ...

90

def random_map(key_type, value_type, sz: int, col_def) -> dict: ...

91

def random_nested(keys: Sequence[str], types: Sequence[ClickHouseType], col_def: RandomValueDef) -> dict: ...

92

```

93

94

### Testing Utilities

95

96

Testing framework utilities including table context managers and test data management for comprehensive testing scenarios.

97

98

```python { .api }

99

from clickhouse_connect.tools.testing import TableContext

100

101

class TableContext:

102

"""

103

Context manager for creating and cleaning up test tables.

104

105

Provides automatic table lifecycle management for testing,

106

ensuring tables are properly created and cleaned up even

107

if tests fail or are interrupted.

108

"""

109

110

def __init__(

111

self,

112

client,

113

table_name: str,

114

columns: list[tuple[str, str]],

115

engine: str = 'Memory',

116

database: str = '',

117

cleanup: bool = True

118

):

119

"""

120

Initialize table context for testing.

121

122

Parameters:

123

- client: ClickHouse client instance

124

- table_name: Name of test table to create

125

- columns: List of (column_name, column_type) tuples

126

- engine: ClickHouse table engine (default: Memory for testing)

127

- database: Target database (uses client default if empty)

128

- cleanup: Whether to drop table on context exit

129

130

Example:

131

with TableContext(client, 'test_users', [

132

('id', 'UInt32'),

133

('name', 'String'),

134

('created', 'DateTime')

135

]) as table:

136

# Use table for testing

137

client.insert(table.full_name, test_data)

138

result = client.query(f'SELECT * FROM {table.full_name}')

139

# Table automatically dropped here

140

"""

141

142

def __enter__(self) -> 'TableContext':

143

"""Create table and return context."""

144

145

def __exit__(self, exc_type, exc_val, exc_tb):

146

"""Clean up table if cleanup enabled."""

147

148

@property

149

def full_name(self) -> str:

150

"""Get fully qualified table name (database.table)."""

151

152

def insert_data(self, data: list, column_names: list[str] = None):

153

"""Insert test data into the table."""

154

155

def query(self, query: str = None) -> QueryResult:

156

"""Query the test table."""

157

```

158

159

### External Data Support

160

161

Support for external data sources and temporary tables for complex query scenarios and data integration workflows.

162

163

```python { .api }

164

from clickhouse_connect.driver.external import ExternalData, ExternalFile

165

166

class ExternalFile:

167

"""

168

External file definition for ClickHouse queries.

169

170

Allows referencing external data files in queries,

171

enabling complex data processing scenarios without

172

permanently storing data in ClickHouse.

173

"""

174

175

def __init__(

176

self,

177

name: str,

178

content: bytes | BinaryIO,

179

fmt: str = 'TSV',

180

types: list[str] = None,

181

structure: str = ''

182

):

183

"""

184

Initialize external file definition.

185

186

Parameters:

187

- name: Logical name for the external file in queries

188

- content: File content as bytes or binary stream

189

- fmt: Data format (TSV, CSV, JSON, etc.)

190

- types: List of ClickHouse type names for columns

191

- structure: Column structure string (alternative to types)

192

193

Example:

194

external_file = ExternalFile(

195

name='lookup_data',

196

content=csv_data.encode('utf-8'),

197

fmt='CSV',

198

types=['String', 'Int32', 'Float64']

199

)

200

"""

201

202

class ExternalData:

203

"""

204

Container for external data sources in queries.

205

206

Manages collection of external files and temporary

207

tables that can be referenced in ClickHouse queries.

208

"""

209

210

def __init__(self):

211

"""Initialize empty external data container."""

212

213

def add_file(

214

self,

215

name: str,

216

content: bytes | BinaryIO,

217

fmt: str = 'TSV',

218

types: list[str] = None,

219

structure: str = ''

220

):

221

"""

222

Add external file to the container.

223

224

Parameters match ExternalFile constructor.

225

"""

226

227

def add_table(

228

self,

229

name: str,

230

data: list[list],

231

column_names: list[str],

232

column_types: list[str]

233

):

234

"""

235

Add external table data to the container.

236

237

Parameters:

238

- name: Logical table name for queries

239

- data: Table data as list of rows

240

- column_names: Column names

241

- column_types: ClickHouse type names for columns

242

"""

243

244

def clear(self):

245

"""Remove all external data sources."""

246

247

@property

248

def files(self) -> dict[str, ExternalFile]:

249

"""Get dictionary of external files by name."""

250

```

251

252

### Common Configuration

253

254

Centralized configuration management and common settings for ClickHouse Connect applications.

255

256

```python { .api }

257

from clickhouse_connect.common import (

258

version,

259

build_client_name,

260

get_setting,

261

set_setting,

262

CommonSetting

263

)

264

265

def version() -> str:

266

"""

267

Get ClickHouse Connect package version.

268

269

Returns:

270

Version string (e.g., '0.8.18')

271

"""

272

273

def build_client_name(client_name: str = '') -> str:

274

"""

275

Build User-Agent string for HTTP requests.

276

277

Parameters:

278

- client_name: Custom client identifier to prepend

279

280

Returns:

281

Complete User-Agent string with client name and version info

282

283

Example:

284

user_agent = build_client_name('MyApp/1.0')

285

# Returns: 'MyApp/1.0 clickhouse-connect/0.8.18 (Python/3.9.0)'

286

"""

287

288

def get_setting(key: str) -> Any:

289

"""

290

Get common setting value.

291

292

Parameters:

293

- key: Setting name

294

295

Returns:

296

Setting value, or default if not set

297

298

Available settings:

299

- autogenerate_session_id: Auto-generate session IDs (bool)

300

- readonly: Global readonly mode (str)

301

- use_protocol_version: Use enhanced protocol features (bool)

302

- max_connection_age: Maximum connection age in seconds (int)

303

"""

304

305

def set_setting(key: str, value: Any):

306

"""

307

Set common setting value.

308

309

Parameters:

310

- key: Setting name

311

- value: Setting value

312

313

Example:

314

set_setting('autogenerate_session_id', True)

315

set_setting('max_connection_age', 3600)

316

"""

317

318

class CommonSetting:

319

"""

320

Common setting definition with metadata.

321

322

Provides setting definition, validation, and

323

default value management for global settings.

324

"""

325

326

def __init__(

327

self,

328

name: str,

329

default_value: Any,

330

description: str = '',

331

validator: callable = None

332

):

333

"""

334

Initialize common setting definition.

335

336

Parameters:

337

- name: Setting name

338

- default_value: Default value for the setting

339

- description: Human-readable description

340

- validator: Optional validation function

341

"""

342

```

343

344

### Helper Functions and Utilities

345

346

Miscellaneous utility functions for data conversion, identifier handling, and common operations.

347

348

```python { .api }

349

from clickhouse_connect.driver.binding import quote_identifier

350

from clickhouse_connect.driver.common import dict_copy, coerce_int, coerce_bool

351

from clickhouse_connect.driver.tzutil import normalize_timezone

352

353

def quote_identifier(identifier: str) -> str:

354

"""

355

Quote ClickHouse identifier if needed.

356

357

Parameters:

358

- identifier: Table name, column name, or other identifier

359

360

Returns:

361

Properly quoted identifier for safe use in SQL

362

363

Example:

364

safe_name = quote_identifier('user-data') # Returns: `user-data`

365

safe_name = quote_identifier('normal_name') # Returns: normal_name

366

"""

367

368

def dict_copy(source: dict, **kwargs) -> dict:

369

"""

370

Create dictionary copy with optional updates.

371

372

Parameters:

373

- source: Source dictionary to copy

374

- **kwargs: Additional key-value pairs to include/override

375

376

Returns:

377

New dictionary with copied and updated values

378

"""

379

380

def coerce_int(value: Any) -> int:

381

"""

382

Safely convert value to integer.

383

384

Parameters:

385

- value: Value to convert

386

387

Returns:

388

Integer value, or 0 if conversion fails

389

"""

390

391

def coerce_bool(value: Any) -> bool:

392

"""

393

Safely convert value to boolean.

394

395

Parameters:

396

- value: Value to convert

397

398

Returns:

399

Boolean value using ClickHouse-style conversion rules

400

"""

401

402

def normalize_timezone(tz) -> tuple[tzinfo, bool]:

403

"""

404

Normalize timezone and check DST safety.

405

406

Parameters:

407

- tz: Timezone object or string

408

409

Returns:

410

Tuple of (normalized_timezone, is_dst_safe)

411

"""

412

```

413

414

### Common Settings Management

415

416

Global configuration settings that affect client behavior across all instances and operations.

417

418

```python { .api }

419

from clickhouse_connect.common import get_setting, set_setting, version

420

421

def version() -> str:

422

"""

423

Get ClickHouse Connect package version.

424

425

Returns:

426

Package version string (e.g., '0.8.18')

427

"""

428

429

def get_setting(name: str) -> Any:

430

"""

431

Get global ClickHouse Connect setting value.

432

433

Parameters:

434

- name: Setting name

435

436

Returns:

437

Current setting value or None if not set

438

439

Available settings:

440

- 'autogenerate_session_id': Auto-generate session IDs (default: True)

441

- 'dict_parameter_format': Dict parameter format: 'json'|'map' (default: 'json')

442

- 'invalid_setting_action': Invalid setting action: 'send'|'drop'|'error' (default: 'error')

443

- 'max_connection_age': Max connection reuse time in seconds (default: 600)

444

- 'product_name': Product name for client identification (default: '')

445

- 'send_os_user': Include OS user in client identification (default: True)

446

- 'use_protocol_version': Use client protocol version (default: True)

447

- 'max_error_size': Maximum error message size (default: 1024)

448

- 'http_buffer_size': HTTP streaming buffer size (default: 10MB)

449

"""

450

451

def set_setting(name: str, value: Any) -> None:

452

"""

453

Set global ClickHouse Connect setting.

454

455

Parameters:

456

- name: Setting name (see get_setting for available settings)

457

- value: Setting value

458

459

Note: Changes affect new client instances created after the setting is changed

460

"""

461

462

def build_client_name(client_name: str = '') -> str:

463

"""

464

Build User-Agent string for HTTP requests.

465

466

Parameters:

467

- client_name: Custom client name to prepend

468

469

Returns:

470

Complete User-Agent string with version and system info

471

"""

472

```

473

474

## Usage Examples

475

476

### Random Data Generation

477

478

```python

479

import clickhouse_connect

480

from clickhouse_connect.tools.datagen import random_col_data, random_value_gen

481

482

client = clickhouse_connect.create_client(host='localhost')

483

484

# Generate test data for different column types

485

user_ids = random_col_data('UInt32', 1000)

486

user_names = random_col_data('String', 1000)

487

signup_dates = random_col_data('DateTime', 1000)

488

scores = random_col_data('Float64', 1000, nullable=True)

489

490

# Combine into test dataset

491

test_data = list(zip(user_ids, user_names, signup_dates, scores))

492

493

# Create test table and insert data

494

client.command("""

495

CREATE TABLE test_users (

496

id UInt32,

497

name String,

498

signup_date DateTime,

499

score Nullable(Float64)

500

) ENGINE = Memory

501

""")

502

503

client.insert('test_users', test_data,

504

column_names=['id', 'name', 'signup_date', 'score'])

505

506

print(f"Inserted {len(test_data)} test records")

507

508

# Query test data

509

result = client.query("SELECT count(), avg(score) FROM test_users WHERE score IS NOT NULL")

510

count, avg_score = result.first_row()

511

print(f"Records: {count}, Average score: {avg_score:.2f}")

512

513

# Generate streaming test data

514

def generate_test_batch(batch_id: int, batch_size: int = 1000):

515

"""Generate a batch of test data."""

516

id_gen = random_value_gen('UInt32')

517

name_gen = random_value_gen('String')

518

date_gen = random_value_gen('DateTime')

519

520

batch = []

521

for i in range(batch_size):

522

batch.append([

523

next(id_gen) + batch_id * batch_size, # Ensure unique IDs

524

f"batch_{batch_id}_{next(name_gen)}",

525

next(date_gen),

526

next(random_value_gen('Float64', nullable=True))

527

])

528

return batch

529

530

# Insert multiple batches

531

for batch_id in range(5):

532

batch_data = generate_test_batch(batch_id)

533

client.insert('test_users', batch_data,

534

column_names=['id', 'name', 'signup_date', 'score'])

535

536

print("Inserted 5 batches of test data")

537

538

# Clean up

539

client.command("DROP TABLE test_users")

540

```

541

542

### Table Context Testing

543

544

```python

545

import clickhouse_connect

546

from clickhouse_connect.tools.testing import TableContext

547

548

client = clickhouse_connect.create_client(host='localhost')

549

550

def test_user_operations():

551

"""Test user operations with automatic table cleanup."""

552

553

with TableContext(

554

client,

555

'test_users',

556

[

557

('id', 'UInt32'),

558

('name', 'String'),

559

('email', 'String'),

560

('created_at', 'DateTime')

561

],

562

engine='Memory'

563

) as table:

564

565

# Insert test data

566

test_users = [

567

[1, 'Alice', 'alice@example.com', '2023-01-01 10:00:00'],

568

[2, 'Bob', 'bob@example.com', '2023-01-02 11:00:00'],

569

[3, 'Carol', 'carol@example.com', '2023-01-03 12:00:00']

570

]

571

572

table.insert_data(test_users, ['id', 'name', 'email', 'created_at'])

573

574

# Test queries

575

result = table.query("SELECT count() FROM {}")

576

assert result.first_item() == 3, "Should have 3 users"

577

578

result = table.query("SELECT name FROM {} WHERE id = 2")

579

assert result.first_item() == 'Bob', "User 2 should be Bob"

580

581

# Test with WHERE clause

582

result = client.query(

583

f"SELECT name FROM {table.full_name} WHERE email LIKE '%@example.com'"

584

)

585

assert len(result.result_set) == 3, "All users have example.com email"

586

587

print("All user operation tests passed!")

588

589

# Table automatically cleaned up here

590

591

def test_with_multiple_tables():

592

"""Test operations across multiple tables."""

593

594

with TableContext(client, 'users', [('id', 'UInt32'), ('name', 'String')]) as users_table, \

595

TableContext(client, 'orders', [('user_id', 'UInt32'), ('amount', 'Float64')]) as orders_table:

596

597

# Insert related data

598

users_table.insert_data([[1, 'Alice'], [2, 'Bob']], ['id', 'name'])

599

orders_table.insert_data([[1, 99.99], [1, 149.99], [2, 79.99]], ['user_id', 'amount'])

600

601

# Test join query

602

result = client.query(f"""

603

SELECT u.name, sum(o.amount) as total

604

FROM {users_table.full_name} u

605

JOIN {orders_table.full_name} o ON u.id = o.user_id

606

GROUP BY u.name

607

ORDER BY total DESC

608

""")

609

610

assert len(result.result_set) == 2, "Should have 2 users with orders"

611

assert result.result_set[0][1] == 249.98, "Alice should have highest total"

612

613

print("Multi-table test passed!")

614

615

# Both tables automatically cleaned up

616

617

# Run tests

618

test_user_operations()

619

test_with_multiple_tables()

620

```

621

622

### External Data Integration

623

624

```python

625

import clickhouse_connect

626

from clickhouse_connect.driver.external import ExternalData

627

import csv

628

import io

629

630

client = clickhouse_connect.create_client(host='localhost')

631

632

def process_with_external_data():

633

"""Process data using external files in queries."""

634

635

# Create CSV data for lookup

636

csv_data = """country_code,country_name,region

637

US,United States,North America

638

CA,Canada,North America

639

GB,United Kingdom,Europe

640

DE,Germany,Europe

641

JP,Japan,Asia

642

"""

643

644

# Create external data container

645

external_data = ExternalData()

646

647

# Add CSV file as external data

648

external_data.add_file(

649

name='country_lookup',

650

content=csv_data.encode('utf-8'),

651

fmt='CSV',

652

types=['String', 'String', 'String']

653

)

654

655

# Add programmatic data as external table

656

currency_data = [

657

['US', 'USD', 1.0],

658

['CA', 'CAD', 0.75],

659

['GB', 'GBP', 1.25],

660

['DE', 'EUR', 1.10],

661

['JP', 'JPY', 0.007]

662

]

663

664

external_data.add_table(

665

name='exchange_rates',

666

data=currency_data,

667

column_names=['country_code', 'currency', 'rate'],

668

column_types=['String', 'String', 'Float64']

669

)

670

671

# Query combining main data with external data

672

result = client.query("""

673

SELECT

674

c.country_name,

675

c.region,

676

e.currency,

677

e.rate,

678

1000 * e.rate as local_amount

679

FROM country_lookup c

680

JOIN exchange_rates e ON c.country_code = e.country_code

681

ORDER BY c.region, c.country_name

682

""", external_data=external_data)

683

684

print("Country data with exchange rates:")

685

for row in result.result_set:

686

country, region, currency, rate, local_amount = row

687

print(f"{country} ({region}): {currency} - Rate: {rate}, 1000 USD = {local_amount:.2f} {currency}")

688

689

def process_large_external_file():

690

"""Process large external file efficiently."""

691

692

# Generate large CSV data

693

csv_buffer = io.StringIO()

694

writer = csv.writer(csv_buffer)

695

writer.writerow(['id', 'category', 'value'])

696

697

for i in range(10000):

698

writer.writerow([i, f'category_{i % 10}', i * 1.5])

699

700

csv_content = csv_buffer.getvalue().encode('utf-8')

701

702

external_data = ExternalData()

703

external_data.add_file(

704

name='large_dataset',

705

content=csv_content,

706

fmt='CSV',

707

types=['UInt32', 'String', 'Float64']

708

)

709

710

# Process external data with aggregation

711

result = client.query("""

712

SELECT

713

category,

714

count() as record_count,

715

avg(value) as avg_value,

716

sum(value) as total_value

717

FROM large_dataset

718

GROUP BY category

719

ORDER BY category

720

""", external_data=external_data)

721

722

print("\nLarge dataset aggregation:")

723

for category, count, avg_val, total_val in result.result_set:

724

print(f"{category}: {count} records, avg: {avg_val:.2f}, total: {total_val:.2f}")

725

726

# Run examples

727

process_with_external_data()

728

process_large_external_file()

729

```

730

731

### Configuration Management

732

733

```python

734

import clickhouse_connect

735

from clickhouse_connect.common import (

736

get_setting, set_setting, version, build_client_name

737

)

738

739

# Display package information

740

print(f"ClickHouse Connect version: {version()}")

741

print(f"Default User-Agent: {build_client_name()}")

742

print(f"Custom User-Agent: {build_client_name('MyApp/2.0')}")

743

744

# Configure global settings

745

print("\nConfiguring global settings:")

746

set_setting('autogenerate_session_id', True)

747

set_setting('max_connection_age', 7200) # 2 hours

748

749

print(f"Auto-generate session ID: {get_setting('autogenerate_session_id')}")

750

print(f"Max connection age: {get_setting('max_connection_age')} seconds")

751

752

# Create client with global settings applied

753

client = clickhouse_connect.create_client(

754

host='localhost',

755

client_name='MyApp/2.0'

756

)

757

758

# Check if session ID was auto-generated

759

session_info = client.command("SELECT sessionId()")

760

print(f"Session ID: {session_info}")

761

762

# Test connection health

763

if client.ping():

764

print("Connection is healthy")

765

766

# Get server info

767

server_version = client.command("SELECT version()")

768

server_uptime = client.command("SELECT uptime()")

769

770

print(f"Server version: {server_version}")

771

print(f"Server uptime: {server_uptime} seconds")

772

773

client.close()

774

```

775

776

### Utility Functions

777

778

```python

779

import clickhouse_connect

780

from clickhouse_connect.driver.binding import quote_identifier

781

from clickhouse_connect.driver.common import dict_copy, coerce_int, coerce_bool

782

783

client = clickhouse_connect.create_client(host='localhost')

784

785

# Safe identifier quoting

786

table_names = ['users', 'user-data', 'order_items', 'special@table']

787

for name in table_names:

788

quoted = quote_identifier(name)

789

print(f"'{name}' -> {quoted}")

790

791

# Build dynamic query with safe identifiers

792

def build_select_query(table_name: str, columns: list[str], where_clause: str = ''):

793

"""Build SELECT query with safe identifier quoting."""

794

795

safe_table = quote_identifier(table_name)

796

safe_columns = [quote_identifier(col) for col in columns]

797

798

query = f"SELECT {', '.join(safe_columns)} FROM {safe_table}"

799

if where_clause:

800

query += f" WHERE {where_clause}"

801

802

return query

803

804

# Example usage

805

query = build_select_query(

806

'user-data',

807

['user-id', 'full-name', 'created-at'],

808

'status = 1'

809

)

810

print(f"\nGenerated query: {query}")

811

812

# Configuration merging

813

base_config = {

814

'host': 'localhost',

815

'port': 8123,

816

'compress': 'lz4'

817

}

818

819

# Create variations with dict_copy

820

dev_config = dict_copy(base_config, database='development', port=8124)

821

prod_config = dict_copy(base_config,

822

host='prod.clickhouse.com',

823

database='production',

824

secure=True)

825

826

print(f"\nBase config: {base_config}")

827

print(f"Dev config: {dev_config}")

828

print(f"Prod config: {prod_config}")

829

830

# Type coercion utilities

831

test_values = ['123', '0', 'true', 'false', '1', '', None, 42]

832

833

print("\nType coercion examples:")

834

for value in test_values:

835

int_val = coerce_int(value)

836

bool_val = coerce_bool(value)

837

print(f"{repr(value):>8} -> int: {int_val:>3}, bool: {bool_val}")

838

839

client.close()

840

```