0
# Data Formats
1
2
Integration with the scientific Python ecosystem including NumPy arrays, Pandas DataFrames, and PyArrow tables for high-performance data processing and analysis workflows. Provides seamless conversion between ClickHouse data and popular data science libraries.
3
4
## Capabilities
5
6
### Pandas DataFrame Integration
7
8
Native support for querying data directly into Pandas DataFrames and inserting DataFrame data into ClickHouse tables with automatic type conversion and index handling.
9
10
```python { .api }
11
def query_df(
12
self,
13
query: str,
14
parameters: dict | None = None,
15
settings: dict | None = None,
16
use_na_values: bool = True,
17
max_str_len: int = 0,
18
context: QueryContext | None = None
19
) -> pd.DataFrame:
20
"""
21
Execute query and return results as Pandas DataFrame.
22
23
Parameters:
24
- query: SQL query string
25
- parameters: Query parameters dictionary
26
- settings: ClickHouse settings for query execution
27
- use_na_values: Use pandas NA values for ClickHouse NULLs
28
- max_str_len: Maximum string column length (0 = unlimited)
29
- context: Reusable query context
30
31
Returns:
32
pandas.DataFrame with properly typed columns and index
33
34
Requires:
35
pandas package (install with: pip install clickhouse-connect[pandas])
36
"""
37
38
def query_df_stream(
39
self,
40
query: str,
41
parameters: dict | None = None,
42
settings: dict | None = None,
43
context: QueryContext | None = None
44
) -> Generator[pd.DataFrame, None, None]:
45
"""
46
Stream query results as DataFrame chunks.
47
48
Yields:
49
pandas.DataFrame objects for each result block
50
51
Requires:
52
pandas package
53
"""
54
55
def insert_df(
56
self,
57
table: str,
58
df: pd.DataFrame,
59
database: str = '',
60
settings: dict | None = None,
61
column_names: Sequence[str] | None = None,
62
column_type_names: Sequence[str] | None = None
63
):
64
"""
65
Insert Pandas DataFrame into ClickHouse table.
66
67
Parameters:
68
- table: Target table name
69
- df: Pandas DataFrame to insert
70
- database: Target database (uses client default if empty)
71
- settings: ClickHouse settings for insert operation
72
- column_names: Override DataFrame column names
73
- column_type_names: Specify ClickHouse types for columns
74
75
Features:
76
- Automatic type conversion from pandas to ClickHouse types
77
- Handles datetime, categorical, and nullable columns
78
- Preserves precision for numeric types
79
- Supports multi-index DataFrames
80
81
Requires:
82
pandas package
83
"""
84
```
85
86
### NumPy Array Integration
87
88
Direct integration with NumPy for high-performance numerical computations with automatic handling of multidimensional arrays and dtype conversion.
89
90
```python { .api }
91
def query_np(
92
self,
93
query: str,
94
parameters: dict | None = None,
95
settings: dict | None = None,
96
external_data: ExternalData | None = None,
97
context: QueryContext | None = None
98
) -> np.ndarray:
99
"""
100
Execute query and return results as NumPy array.
101
102
Parameters:
103
- query: SQL query string
104
- parameters: Query parameters dictionary
105
- settings: ClickHouse settings for query execution
106
- external_data: External data for query processing
107
- context: Reusable query context
108
109
Returns:
110
numpy.ndarray with appropriate dtype for ClickHouse column types
111
112
Features:
113
- Automatic dtype selection based on ClickHouse types
114
- Efficient memory layout for numerical operations
115
- Support for structured arrays (named columns)
116
- Handles nullable columns with masked arrays
117
118
Requires:
119
numpy package (install with: pip install clickhouse-connect[numpy])
120
"""
121
122
def query_np_stream(
123
self,
124
query: str,
125
parameters: dict | None = None,
126
settings: dict | None = None,
127
context: QueryContext | None = None
128
) -> Generator[np.ndarray, None, None]:
129
"""
130
Stream query results as NumPy array chunks.
131
132
Yields:
133
numpy.ndarray objects for each result block
134
135
Requires:
136
numpy package
137
"""
138
```
139
140
### PyArrow Integration
141
142
Integration with Apache Arrow for columnar data processing with zero-copy operations and interoperability with Arrow-based tools and file formats.
143
144
```python { .api }
145
def query_arrow(
146
self,
147
query: str,
148
parameters: dict | None = None,
149
settings: dict | None = None,
150
context: QueryContext | None = None
151
) -> pa.Table:
152
"""
153
Execute query and return results as PyArrow Table.
154
155
Parameters:
156
- query: SQL query string
157
- parameters: Query parameters dictionary
158
- settings: ClickHouse settings for query execution
159
- context: Reusable query context
160
161
Returns:
162
pyarrow.Table with schema matching ClickHouse column types
163
164
Features:
165
- Zero-copy data transfer where possible
166
- Preserves all ClickHouse type information
167
- Efficient columnar storage format
168
- Compatible with Arrow ecosystem tools
169
170
Requires:
171
pyarrow package (install with: pip install clickhouse-connect[arrow])
172
"""
173
174
def query_arrow_stream(
175
self,
176
query: str,
177
parameters: dict | None = None,
178
settings: dict | None = None,
179
context: QueryContext | None = None
180
) -> Generator[pa.Table, None, None]:
181
"""
182
Stream query results as PyArrow Table chunks.
183
184
Yields:
185
pyarrow.Table objects for each result block
186
187
Requires:
188
pyarrow package
189
"""
190
191
def insert_arrow(
192
self,
193
table: str,
194
arrow_table: pa.Table,
195
database: str = '',
196
settings: dict | None = None
197
):
198
"""
199
Insert PyArrow Table into ClickHouse table.
200
201
Parameters:
202
- table: Target table name
203
- arrow_table: PyArrow Table to insert
204
- database: Target database (uses client default if empty)
205
- settings: ClickHouse settings for insert operation
206
207
Features:
208
- Direct columnar data transfer
209
- Automatic schema mapping from Arrow to ClickHouse
210
- Efficient batch processing
211
- Preserves metadata and type information
212
213
Requires:
214
pyarrow package
215
"""
216
```
217
218
### Arrow Batch Processing
219
220
Advanced Arrow integration supporting batch operations and RecordBatch processing for memory-efficient handling of large datasets.
221
222
```python { .api }
223
def to_arrow_batches(
224
result: QueryResult,
225
max_block_size: int = 65536
226
) -> Generator[pa.RecordBatch, None, None]:
227
"""
228
Convert QueryResult to Arrow RecordBatch generator.
229
230
Parameters:
231
- result: QueryResult from query execution
232
- max_block_size: Maximum rows per batch
233
234
Yields:
235
pyarrow.RecordBatch objects with consistent schema
236
237
Requires:
238
pyarrow package
239
"""
240
241
def arrow_buffer(
242
self,
243
query: str,
244
parameters: dict | None = None,
245
settings: dict | None = None,
246
context: QueryContext | None = None
247
) -> BinaryIO:
248
"""
249
Execute query and return Arrow IPC buffer.
250
251
Parameters:
252
- query: SQL query string
253
- parameters: Query parameters dictionary
254
- settings: ClickHouse settings
255
- context: Query context
256
257
Returns:
258
Binary stream containing Arrow IPC data
259
260
Requires:
261
pyarrow package
262
"""
263
```
264
265
### Data Type Conversion
266
267
Comprehensive type mapping between ClickHouse types and Python data science library types with configurable conversion options.
268
269
```python { .api }
270
# ClickHouse to Pandas type mapping
271
CH_PANDAS_TYPE_MAP = {
272
'String': 'object',
273
'Int32': 'int32',
274
'Int64': 'int64',
275
'Float32': 'float32',
276
'Float64': 'float64',
277
'DateTime': 'datetime64[ns]',
278
'Date': 'datetime64[ns]',
279
'Bool': 'bool',
280
'Nullable(Int32)': 'Int32', # Pandas nullable integer
281
'Array(String)': 'object'
282
}
283
284
# ClickHouse to NumPy dtype mapping
285
CH_NUMPY_TYPE_MAP = {
286
'Int8': np.int8,
287
'Int16': np.int16,
288
'Int32': np.int32,
289
'Int64': np.int64,
290
'UInt8': np.uint8,
291
'UInt16': np.uint16,
292
'UInt32': np.uint32,
293
'UInt64': np.uint64,
294
'Float32': np.float32,
295
'Float64': np.float64,
296
'String': np.object_,
297
'DateTime': 'datetime64[s]',
298
'Date': 'datetime64[D]'
299
}
300
301
# ClickHouse to Arrow type mapping
302
CH_ARROW_TYPE_MAP = {
303
'String': pa.string(),
304
'Int32': pa.int32(),
305
'Int64': pa.int64(),
306
'Float64': pa.float64(),
307
'DateTime': pa.timestamp('s'),
308
'Date': pa.date32(),
309
'Array(String)': pa.list_(pa.string()),
310
'Nullable(Int32)': pa.int32() # Arrow handles nullability natively
311
}
312
```
313
314
## Usage Examples
315
316
### Pandas DataFrame Operations
317
318
```python
319
import clickhouse_connect
320
import pandas as pd
321
322
client = clickhouse_connect.create_client(host='localhost')
323
324
# Query to DataFrame
325
df = client.query_df("""
326
SELECT
327
user_id,
328
event_time,
329
event_type,
330
value
331
FROM events
332
WHERE event_time >= '2023-01-01'
333
ORDER BY event_time
334
""")
335
336
print(df.dtypes)
337
print(df.head())
338
339
# DataFrame analysis
340
daily_stats = df.groupby(df['event_time'].dt.date).agg({
341
'user_id': 'nunique',
342
'value': ['sum', 'mean', 'count']
343
})
344
345
# Insert processed DataFrame
346
client.insert_df('daily_stats', daily_stats)
347
348
# Streaming large datasets
349
for chunk_df in client.query_df_stream(
350
'SELECT * FROM large_events_table',
351
settings={'max_block_size': 50000}
352
):
353
# Process each chunk
354
processed_chunk = chunk_df.groupby('category').sum()
355
client.insert_df('processed_events', processed_chunk)
356
```
357
358
### NumPy Array Operations
359
360
```python
361
import clickhouse_connect
362
import numpy as np
363
364
client = clickhouse_connect.create_client(host='localhost')
365
366
# Query to NumPy array
367
data = client.query_np("""
368
SELECT
369
price,
370
volume,
371
timestamp
372
FROM market_data
373
WHERE symbol = 'AAPL'
374
ORDER BY timestamp
375
""")
376
377
# Data is returned as structured array
378
prices = data['price']
379
volumes = data['volume']
380
timestamps = data['timestamp']
381
382
# Numerical analysis
383
price_changes = np.diff(prices)
384
volume_weighted_price = np.average(prices, weights=volumes)
385
correlation = np.corrcoef(prices[1:], volumes[1:])[0, 1]
386
387
print(f"VWAP: {volume_weighted_price}")
388
print(f"Price-Volume Correlation: {correlation}")
389
390
# Streaming numerical data
391
running_sum = 0
392
count = 0
393
394
for chunk in client.query_np_stream(
395
'SELECT value FROM sensor_data ORDER BY timestamp',
396
settings={'max_block_size': 100000}
397
):
398
running_sum += np.sum(chunk['value'])
399
count += len(chunk)
400
401
average = running_sum / count
402
print(f"Overall average: {average}")
403
```
404
405
### PyArrow Table Operations
406
407
```python
408
import clickhouse_connect
409
import pyarrow as pa
410
411
client = clickhouse_connect.create_client(host='localhost')
412
413
# Query to Arrow Table
414
table = client.query_arrow("""
415
SELECT
416
customer_id,
417
product_id,
418
quantity,
419
price,
420
order_date
421
FROM orders
422
WHERE order_date >= '2023-01-01'
423
""")
424
425
# Arrow operations
426
filtered_table = table.filter(
427
pa.compute.greater(table['quantity'], 10)
428
)
429
430
# Aggregation
431
summary = filtered_table.group_by(['product_id']).aggregate([
432
('quantity', 'sum'),
433
('price', 'mean'),
434
('customer_id', 'count_distinct')
435
])
436
437
print(f"Schema: {table.schema}")
438
print(f"Rows: {table.num_rows}")
439
440
# Convert to other formats
441
pandas_df = table.to_pandas()
442
numpy_dict = table.to_pydict()
443
444
# Save to file formats
445
table.to_parquet('orders_export.parquet')
446
447
# Insert Arrow data
448
new_data = pa.table({
449
'id': [1, 2, 3],
450
'name': ['Alice', 'Bob', 'Carol'],
451
'score': [95.5, 87.2, 92.1]
452
})
453
454
client.insert_arrow('test_scores', new_data)
455
```
456
457
### Mixed Format Workflows
458
459
```python
460
import clickhouse_connect
461
import pandas as pd
462
import numpy as np
463
import pyarrow as pa
464
465
client = clickhouse_connect.create_client(host='localhost')
466
467
# Start with raw data query
468
raw_data = client.query('SELECT * FROM raw_events')
469
470
# Convert to different formats as needed
471
df = pd.DataFrame(raw_data.result_set, columns=raw_data.column_names)
472
473
# Data cleaning with pandas
474
df_clean = df.dropna().reset_index(drop=True)
475
df_clean['processed_time'] = pd.Timestamp.now()
476
477
# Convert to Arrow for efficient storage
478
arrow_table = pa.Table.from_pandas(df_clean)
479
480
# Save intermediate results
481
client.insert_arrow('cleaned_events', arrow_table)
482
483
# Numerical analysis with NumPy
484
numeric_data = client.query_np(
485
'SELECT value, timestamp FROM cleaned_events ORDER BY timestamp'
486
)
487
488
# Time series analysis
489
values = numeric_data['value']
490
timestamps = numeric_data['timestamp']
491
492
# Moving average calculation
493
window_size = 100
494
moving_avg = np.convolve(values, np.ones(window_size)/window_size, mode='valid')
495
496
# Store results back
497
results_df = pd.DataFrame({
498
'timestamp': timestamps[window_size-1:],
499
'original_value': values[window_size-1:],
500
'moving_average': moving_avg,
501
'deviation': values[window_size-1:] - moving_avg
502
})
503
504
client.insert_df('time_series_analysis', results_df)
505
```
506
507
### Performance Optimization
508
509
```python
510
# Optimize DataFrame queries
511
df = client.query_df(
512
'SELECT * FROM large_table',
513
settings={
514
'max_threads': 8,
515
'max_block_size': 65536,
516
'max_memory_usage': '4G'
517
},
518
use_na_values=True, # Use pandas NA for better performance
519
max_str_len=1000 # Limit string length
520
)
521
522
# Streaming for memory efficiency
523
total_sum = 0
524
row_count = 0
525
526
for chunk in client.query_np_stream(
527
'SELECT numeric_column FROM huge_table',
528
settings={'max_block_size': 100000}
529
):
530
total_sum += np.sum(chunk['numeric_column'])
531
row_count += len(chunk)
532
533
average = total_sum / row_count
534
535
# Batch insert for better performance
536
batch_size = 10000
537
for i in range(0, len(large_df), batch_size):
538
batch = large_df.iloc[i:i+batch_size]
539
client.insert_df('target_table', batch)
540
```