or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

client-api.mddata-formats.mddbapi.mdexceptions.mdindex.mdsqlalchemy.mdutilities.md

data-formats.mddocs/

0

# Data Formats

1

2

Integration with the scientific Python ecosystem including NumPy arrays, Pandas DataFrames, and PyArrow tables for high-performance data processing and analysis workflows. Provides seamless conversion between ClickHouse data and popular data science libraries.

3

4

## Capabilities

5

6

### Pandas DataFrame Integration

7

8

Native support for querying data directly into Pandas DataFrames and inserting DataFrame data into ClickHouse tables with automatic type conversion and index handling.

9

10

```python { .api }

11

def query_df(

12

self,

13

query: str,

14

parameters: dict | None = None,

15

settings: dict | None = None,

16

use_na_values: bool = True,

17

max_str_len: int = 0,

18

context: QueryContext | None = None

19

) -> pd.DataFrame:

20

"""

21

Execute query and return results as Pandas DataFrame.

22

23

Parameters:

24

- query: SQL query string

25

- parameters: Query parameters dictionary

26

- settings: ClickHouse settings for query execution

27

- use_na_values: Use pandas NA values for ClickHouse NULLs

28

- max_str_len: Maximum string column length (0 = unlimited)

29

- context: Reusable query context

30

31

Returns:

32

pandas.DataFrame with properly typed columns and index

33

34

Requires:

35

pandas package (install with: pip install clickhouse-connect[pandas])

36

"""

37

38

def query_df_stream(

39

self,

40

query: str,

41

parameters: dict | None = None,

42

settings: dict | None = None,

43

context: QueryContext | None = None

44

) -> Generator[pd.DataFrame, None, None]:

45

"""

46

Stream query results as DataFrame chunks.

47

48

Yields:

49

pandas.DataFrame objects for each result block

50

51

Requires:

52

pandas package

53

"""

54

55

def insert_df(

56

self,

57

table: str,

58

df: pd.DataFrame,

59

database: str = '',

60

settings: dict | None = None,

61

column_names: Sequence[str] | None = None,

62

column_type_names: Sequence[str] | None = None

63

):

64

"""

65

Insert Pandas DataFrame into ClickHouse table.

66

67

Parameters:

68

- table: Target table name

69

- df: Pandas DataFrame to insert

70

- database: Target database (uses client default if empty)

71

- settings: ClickHouse settings for insert operation

72

- column_names: Override DataFrame column names

73

- column_type_names: Specify ClickHouse types for columns

74

75

Features:

76

- Automatic type conversion from pandas to ClickHouse types

77

- Handles datetime, categorical, and nullable columns

78

- Preserves precision for numeric types

79

- Supports multi-index DataFrames

80

81

Requires:

82

pandas package

83

"""

84

```

85

86

### NumPy Array Integration

87

88

Direct integration with NumPy for high-performance numerical computations with automatic handling of multidimensional arrays and dtype conversion.

89

90

```python { .api }

91

def query_np(

92

self,

93

query: str,

94

parameters: dict | None = None,

95

settings: dict | None = None,

96

external_data: ExternalData | None = None,

97

context: QueryContext | None = None

98

) -> np.ndarray:

99

"""

100

Execute query and return results as NumPy array.

101

102

Parameters:

103

- query: SQL query string

104

- parameters: Query parameters dictionary

105

- settings: ClickHouse settings for query execution

106

- external_data: External data for query processing

107

- context: Reusable query context

108

109

Returns:

110

numpy.ndarray with appropriate dtype for ClickHouse column types

111

112

Features:

113

- Automatic dtype selection based on ClickHouse types

114

- Efficient memory layout for numerical operations

115

- Support for structured arrays (named columns)

116

- Handles nullable columns with masked arrays

117

118

Requires:

119

numpy package (install with: pip install clickhouse-connect[numpy])

120

"""

121

122

def query_np_stream(

123

self,

124

query: str,

125

parameters: dict | None = None,

126

settings: dict | None = None,

127

context: QueryContext | None = None

128

) -> Generator[np.ndarray, None, None]:

129

"""

130

Stream query results as NumPy array chunks.

131

132

Yields:

133

numpy.ndarray objects for each result block

134

135

Requires:

136

numpy package

137

"""

138

```

139

140

### PyArrow Integration

141

142

Integration with Apache Arrow for columnar data processing with zero-copy operations and interoperability with Arrow-based tools and file formats.

143

144

```python { .api }

145

def query_arrow(

146

self,

147

query: str,

148

parameters: dict | None = None,

149

settings: dict | None = None,

150

context: QueryContext | None = None

151

) -> pa.Table:

152

"""

153

Execute query and return results as PyArrow Table.

154

155

Parameters:

156

- query: SQL query string

157

- parameters: Query parameters dictionary

158

- settings: ClickHouse settings for query execution

159

- context: Reusable query context

160

161

Returns:

162

pyarrow.Table with schema matching ClickHouse column types

163

164

Features:

165

- Zero-copy data transfer where possible

166

- Preserves all ClickHouse type information

167

- Efficient columnar storage format

168

- Compatible with Arrow ecosystem tools

169

170

Requires:

171

pyarrow package (install with: pip install clickhouse-connect[arrow])

172

"""

173

174

def query_arrow_stream(

175

self,

176

query: str,

177

parameters: dict | None = None,

178

settings: dict | None = None,

179

context: QueryContext | None = None

180

) -> Generator[pa.Table, None, None]:

181

"""

182

Stream query results as PyArrow Table chunks.

183

184

Yields:

185

pyarrow.Table objects for each result block

186

187

Requires:

188

pyarrow package

189

"""

190

191

def insert_arrow(

192

self,

193

table: str,

194

arrow_table: pa.Table,

195

database: str = '',

196

settings: dict | None = None

197

):

198

"""

199

Insert PyArrow Table into ClickHouse table.

200

201

Parameters:

202

- table: Target table name

203

- arrow_table: PyArrow Table to insert

204

- database: Target database (uses client default if empty)

205

- settings: ClickHouse settings for insert operation

206

207

Features:

208

- Direct columnar data transfer

209

- Automatic schema mapping from Arrow to ClickHouse

210

- Efficient batch processing

211

- Preserves metadata and type information

212

213

Requires:

214

pyarrow package

215

"""

216

```

217

218

### Arrow Batch Processing

219

220

Advanced Arrow integration supporting batch operations and RecordBatch processing for memory-efficient handling of large datasets.

221

222

```python { .api }

223

def to_arrow_batches(

224

result: QueryResult,

225

max_block_size: int = 65536

226

) -> Generator[pa.RecordBatch, None, None]:

227

"""

228

Convert QueryResult to Arrow RecordBatch generator.

229

230

Parameters:

231

- result: QueryResult from query execution

232

- max_block_size: Maximum rows per batch

233

234

Yields:

235

pyarrow.RecordBatch objects with consistent schema

236

237

Requires:

238

pyarrow package

239

"""

240

241

def arrow_buffer(

242

self,

243

query: str,

244

parameters: dict | None = None,

245

settings: dict | None = None,

246

context: QueryContext | None = None

247

) -> BinaryIO:

248

"""

249

Execute query and return Arrow IPC buffer.

250

251

Parameters:

252

- query: SQL query string

253

- parameters: Query parameters dictionary

254

- settings: ClickHouse settings

255

- context: Query context

256

257

Returns:

258

Binary stream containing Arrow IPC data

259

260

Requires:

261

pyarrow package

262

"""

263

```

264

265

### Data Type Conversion

266

267

Comprehensive type mapping between ClickHouse types and Python data science library types with configurable conversion options.

268

269

```python { .api }

270

# ClickHouse to Pandas type mapping

271

CH_PANDAS_TYPE_MAP = {

272

'String': 'object',

273

'Int32': 'int32',

274

'Int64': 'int64',

275

'Float32': 'float32',

276

'Float64': 'float64',

277

'DateTime': 'datetime64[ns]',

278

'Date': 'datetime64[ns]',

279

'Bool': 'bool',

280

'Nullable(Int32)': 'Int32', # Pandas nullable integer

281

'Array(String)': 'object'

282

}

283

284

# ClickHouse to NumPy dtype mapping

285

CH_NUMPY_TYPE_MAP = {

286

'Int8': np.int8,

287

'Int16': np.int16,

288

'Int32': np.int32,

289

'Int64': np.int64,

290

'UInt8': np.uint8,

291

'UInt16': np.uint16,

292

'UInt32': np.uint32,

293

'UInt64': np.uint64,

294

'Float32': np.float32,

295

'Float64': np.float64,

296

'String': np.object_,

297

'DateTime': 'datetime64[s]',

298

'Date': 'datetime64[D]'

299

}

300

301

# ClickHouse to Arrow type mapping

302

CH_ARROW_TYPE_MAP = {

303

'String': pa.string(),

304

'Int32': pa.int32(),

305

'Int64': pa.int64(),

306

'Float64': pa.float64(),

307

'DateTime': pa.timestamp('s'),

308

'Date': pa.date32(),

309

'Array(String)': pa.list_(pa.string()),

310

'Nullable(Int32)': pa.int32() # Arrow handles nullability natively

311

}

312

```

313

314

## Usage Examples

315

316

### Pandas DataFrame Operations

317

318

```python

319

import clickhouse_connect

320

import pandas as pd

321

322

client = clickhouse_connect.create_client(host='localhost')

323

324

# Query to DataFrame

325

df = client.query_df("""

326

SELECT

327

user_id,

328

event_time,

329

event_type,

330

value

331

FROM events

332

WHERE event_time >= '2023-01-01'

333

ORDER BY event_time

334

""")

335

336

print(df.dtypes)

337

print(df.head())

338

339

# DataFrame analysis

340

daily_stats = df.groupby(df['event_time'].dt.date).agg({

341

'user_id': 'nunique',

342

'value': ['sum', 'mean', 'count']

343

})

344

345

# Insert processed DataFrame

346

client.insert_df('daily_stats', daily_stats)

347

348

# Streaming large datasets

349

for chunk_df in client.query_df_stream(

350

'SELECT * FROM large_events_table',

351

settings={'max_block_size': 50000}

352

):

353

# Process each chunk

354

processed_chunk = chunk_df.groupby('category').sum()

355

client.insert_df('processed_events', processed_chunk)

356

```

357

358

### NumPy Array Operations

359

360

```python

361

import clickhouse_connect

362

import numpy as np

363

364

client = clickhouse_connect.create_client(host='localhost')

365

366

# Query to NumPy array

367

data = client.query_np("""

368

SELECT

369

price,

370

volume,

371

timestamp

372

FROM market_data

373

WHERE symbol = 'AAPL'

374

ORDER BY timestamp

375

""")

376

377

# Data is returned as structured array

378

prices = data['price']

379

volumes = data['volume']

380

timestamps = data['timestamp']

381

382

# Numerical analysis

383

price_changes = np.diff(prices)

384

volume_weighted_price = np.average(prices, weights=volumes)

385

correlation = np.corrcoef(prices[1:], volumes[1:])[0, 1]

386

387

print(f"VWAP: {volume_weighted_price}")

388

print(f"Price-Volume Correlation: {correlation}")

389

390

# Streaming numerical data

391

running_sum = 0

392

count = 0

393

394

for chunk in client.query_np_stream(

395

'SELECT value FROM sensor_data ORDER BY timestamp',

396

settings={'max_block_size': 100000}

397

):

398

running_sum += np.sum(chunk['value'])

399

count += len(chunk)

400

401

average = running_sum / count

402

print(f"Overall average: {average}")

403

```

404

405

### PyArrow Table Operations

406

407

```python

408

import clickhouse_connect

409

import pyarrow as pa

410

411

client = clickhouse_connect.create_client(host='localhost')

412

413

# Query to Arrow Table

414

table = client.query_arrow("""

415

SELECT

416

customer_id,

417

product_id,

418

quantity,

419

price,

420

order_date

421

FROM orders

422

WHERE order_date >= '2023-01-01'

423

""")

424

425

# Arrow operations

426

filtered_table = table.filter(

427

pa.compute.greater(table['quantity'], 10)

428

)

429

430

# Aggregation

431

summary = filtered_table.group_by(['product_id']).aggregate([

432

('quantity', 'sum'),

433

('price', 'mean'),

434

('customer_id', 'count_distinct')

435

])

436

437

print(f"Schema: {table.schema}")

438

print(f"Rows: {table.num_rows}")

439

440

# Convert to other formats

441

pandas_df = table.to_pandas()

442

numpy_dict = table.to_pydict()

443

444

# Save to file formats

445

table.to_parquet('orders_export.parquet')

446

447

# Insert Arrow data

448

new_data = pa.table({

449

'id': [1, 2, 3],

450

'name': ['Alice', 'Bob', 'Carol'],

451

'score': [95.5, 87.2, 92.1]

452

})

453

454

client.insert_arrow('test_scores', new_data)

455

```

456

457

### Mixed Format Workflows

458

459

```python

460

import clickhouse_connect

461

import pandas as pd

462

import numpy as np

463

import pyarrow as pa

464

465

client = clickhouse_connect.create_client(host='localhost')

466

467

# Start with raw data query

468

raw_data = client.query('SELECT * FROM raw_events')

469

470

# Convert to different formats as needed

471

df = pd.DataFrame(raw_data.result_set, columns=raw_data.column_names)

472

473

# Data cleaning with pandas

474

df_clean = df.dropna().reset_index(drop=True)

475

df_clean['processed_time'] = pd.Timestamp.now()

476

477

# Convert to Arrow for efficient storage

478

arrow_table = pa.Table.from_pandas(df_clean)

479

480

# Save intermediate results

481

client.insert_arrow('cleaned_events', arrow_table)

482

483

# Numerical analysis with NumPy

484

numeric_data = client.query_np(

485

'SELECT value, timestamp FROM cleaned_events ORDER BY timestamp'

486

)

487

488

# Time series analysis

489

values = numeric_data['value']

490

timestamps = numeric_data['timestamp']

491

492

# Moving average calculation

493

window_size = 100

494

moving_avg = np.convolve(values, np.ones(window_size)/window_size, mode='valid')

495

496

# Store results back

497

results_df = pd.DataFrame({

498

'timestamp': timestamps[window_size-1:],

499

'original_value': values[window_size-1:],

500

'moving_average': moving_avg,

501

'deviation': values[window_size-1:] - moving_avg

502

})

503

504

client.insert_df('time_series_analysis', results_df)

505

```

506

507

### Performance Optimization

508

509

```python

510

# Optimize DataFrame queries

511

df = client.query_df(

512

'SELECT * FROM large_table',

513

settings={

514

'max_threads': 8,

515

'max_block_size': 65536,

516

'max_memory_usage': '4G'

517

},

518

use_na_values=True, # Use pandas NA for better performance

519

max_str_len=1000 # Limit string length

520

)

521

522

# Streaming for memory efficiency

523

total_sum = 0

524

row_count = 0

525

526

for chunk in client.query_np_stream(

527

'SELECT numeric_column FROM huge_table',

528

settings={'max_block_size': 100000}

529

):

530

total_sum += np.sum(chunk['numeric_column'])

531

row_count += len(chunk)

532

533

average = total_sum / row_count

534

535

# Batch insert for better performance

536

batch_size = 10000

537

for i in range(0, len(large_df), batch_size):

538

batch = large_df.iloc[i:i+batch_size]

539

client.insert_df('target_table', batch)

540

```