0
# DataFrame Integration
1
2
Seamless integration with pandas DataFrames and PyArrow Tables, enabling SQL queries on DataFrames, table joins, and efficient data conversion between formats. This module bridges Python data science workflows with SQL analytical capabilities.
3
4
## Capabilities
5
6
### Table Wrapper Class
7
8
Unified interface for working with multiple data formats including DataFrames, Arrow tables, and Parquet files.
9
10
```python { .api }
11
class Table:
12
def __init__(
13
self,
14
parquet_path: str = None,
15
temp_parquet_path: str = None,
16
parquet_memoryview: memoryview = None,
17
dataframe: pd.DataFrame = None,
18
arrow_table: pa.Table = None,
19
use_memfd: bool = False
20
):
21
"""
22
Initialize Table with one of multiple data source formats.
23
24
Parameters:
25
- parquet_path: Path to existing Parquet file
26
- temp_parquet_path: Path to temporary Parquet file (auto-deleted)
27
- parquet_memoryview: Parquet data as memory view
28
- dataframe: pandas DataFrame
29
- arrow_table: PyArrow Table
30
- use_memfd: Use memory file descriptor on Linux (fallback to temp file)
31
"""
32
33
def to_pandas(self) -> pd.DataFrame:
34
"""
35
Convert table data to pandas DataFrame.
36
37
Returns:
38
pd.DataFrame: Data as pandas DataFrame
39
40
Raises:
41
ImportError: If pandas or pyarrow not available
42
ValueError: If no data buffer available
43
"""
44
45
def flush_to_disk(self):
46
"""
47
Flush in-memory data to disk as temporary Parquet file.
48
Frees memory by converting DataFrame/Arrow table to disk storage.
49
"""
50
51
def rows_read(self) -> int:
52
"""Get number of rows processed in last query operation."""
53
54
def bytes_read(self) -> int:
55
"""Get number of bytes processed in last query operation."""
56
57
def elapsed(self) -> float:
58
"""Get elapsed time for last query operation in seconds."""
59
```
60
61
### Static Query Methods
62
63
Execute SQL queries on DataFrames and tables with automatic table registration.
64
65
```python { .api }
66
class Table:
67
@staticmethod
68
def queryStatic(*args, **kwargs):
69
"""
70
Execute SQL query on provided tables/DataFrames.
71
72
Parameters:
73
- sql: SQL query string with table references (__tbl1__, __tbl2__, __table__)
74
- tbl1, tbl2, ...: DataFrames or Table objects referenced in query
75
- output_format: Output format ("DataFrame", "JSON", "CSV", etc.)
76
77
Returns:
78
Query result in specified format
79
"""
80
```
81
82
### Module-Level Functions
83
84
Convenience functions for direct DataFrame querying.
85
86
```python { .api }
87
def query(*args, **kwargs):
88
"""
89
Execute SQL query on DataFrames/tables. Alias for Table.queryStatic().
90
91
Parameters:
92
- sql: SQL query string
93
- **kwargs: Named table parameters (tbl1, tbl2, etc.)
94
95
Returns:
96
Query result or Table object
97
"""
98
99
def sql(*args, **kwargs):
100
"""Alias for query() function with identical functionality."""
101
102
def pandas_read_parquet(*args, **kwargs):
103
"""
104
Enhanced pandas Parquet reader with optimizations.
105
106
Parameters:
107
Same as pandas.read_parquet()
108
109
Returns:
110
pd.DataFrame: Loaded DataFrame
111
"""
112
```
113
114
## Usage Examples
115
116
### Basic DataFrame Queries
117
118
```python
119
import chdb.dataframe as cdf
120
import pandas as pd
121
122
# Create sample DataFrames
123
df1 = pd.DataFrame({
124
'id': [1, 2, 3, 4],
125
'name': ['Alice', 'Bob', 'Charlie', 'Diana'],
126
'age': [25, 30, 35, 28]
127
})
128
129
df2 = pd.DataFrame({
130
'id': [1, 2, 3, 4],
131
'department': ['Engineering', 'Sales', 'Marketing', 'Engineering'],
132
'salary': [75000, 65000, 70000, 80000]
133
})
134
135
# Query single DataFrame
136
result = cdf.query(
137
sql="SELECT name, age FROM __tbl1__ WHERE age > 28",
138
tbl1=df1
139
)
140
print(result.to_pandas())
141
142
# Join multiple DataFrames
143
joined = cdf.query(
144
sql="""
145
SELECT t1.name, t1.age, t2.department, t2.salary
146
FROM __tbl1__ t1
147
JOIN __tbl2__ t2 ON t1.id = t2.id
148
WHERE t2.salary > 70000
149
ORDER BY t2.salary DESC
150
""",
151
tbl1=df1,
152
tbl2=df2
153
)
154
print(joined.to_pandas())
155
```
156
157
### Working with Table Objects
158
159
```python
160
import chdb.dataframe as cdf
161
import pandas as pd
162
163
# Create DataFrame
164
sales_data = pd.DataFrame({
165
'date': ['2024-01-01', '2024-01-02', '2024-01-03'],
166
'product': ['Widget A', 'Widget B', 'Widget A'],
167
'quantity': [100, 150, 75],
168
'price': [9.99, 14.99, 9.99]
169
})
170
171
# Create Table object
172
sales_table = cdf.Table(dataframe=sales_data)
173
174
# Query the table
175
result_table = cdf.query(
176
sql="""
177
SELECT
178
product,
179
SUM(quantity) as total_quantity,
180
SUM(quantity * price) as total_revenue,
181
AVG(price) as avg_price
182
FROM __table__
183
GROUP BY product
184
ORDER BY total_revenue DESC
185
""",
186
table=sales_table
187
)
188
189
# Get results as DataFrame
190
summary_df = result_table.to_pandas()
191
print("Product Summary:")
192
print(summary_df)
193
194
# Check query performance metrics
195
print(f"Rows processed: {result_table.rows_read()}")
196
print(f"Bytes processed: {result_table.bytes_read()}")
197
print(f"Query time: {result_table.elapsed():.3f} seconds")
198
```
199
200
### Chaining Queries on Table Results
201
202
```python
203
import chdb.dataframe as cdf
204
import pandas as pd
205
206
# Initial data
207
orders = pd.DataFrame({
208
'order_id': [1, 2, 3, 4, 5],
209
'customer_id': [101, 102, 101, 103, 102],
210
'amount': [250.0, 125.5, 89.99, 450.0, 200.0],
211
'order_date': ['2024-01-15', '2024-01-16', '2024-01-17', '2024-01-18', '2024-01-19']
212
})
213
214
# First query: Customer totals
215
customer_totals = cdf.query(
216
sql="""
217
SELECT
218
customer_id,
219
COUNT(*) as order_count,
220
SUM(amount) as total_spent,
221
AVG(amount) as avg_order
222
FROM __tbl1__
223
GROUP BY customer_id
224
""",
225
tbl1=orders
226
)
227
228
# Second query: Query the results of the first query
229
top_customers = customer_totals.query(
230
sql="""
231
SELECT
232
customer_id,
233
total_spent,
234
order_count
235
FROM __table__
236
WHERE total_spent > 200
237
ORDER BY total_spent DESC
238
"""
239
)
240
241
print("Top customers (>$200):")
242
print(top_customers.to_pandas())
243
```
244
245
### Working with Parquet Files
246
247
```python
248
import chdb.dataframe as cdf
249
250
# Query Parquet file directly through Table
251
parquet_table = cdf.Table(parquet_path="large_dataset.parquet")
252
253
# Execute complex analytical query
254
analysis = cdf.query(
255
sql="""
256
SELECT
257
DATE_TRUNC('month', date_column) as month,
258
category,
259
COUNT(*) as record_count,
260
SUM(value_column) as total_value,
261
AVG(value_column) as avg_value
262
FROM __table__
263
WHERE date_column >= '2024-01-01'
264
GROUP BY month, category
265
ORDER BY month, total_value DESC
266
""",
267
table=parquet_table
268
)
269
270
# Get results as DataFrame for further processing
271
monthly_analysis = analysis.to_pandas()
272
print("Monthly analysis:")
273
print(monthly_analysis)
274
275
# Performance metrics
276
print(f"Processed {analysis.rows_read():,} rows")
277
print(f"Data size: {analysis.bytes_read() / 1024 / 1024:.2f} MB")
278
```
279
280
### Memory Management with Large DataFrames
281
282
```python
283
import chdb.dataframe as cdf
284
import pandas as pd
285
286
# Create large DataFrame
287
large_df = pd.DataFrame({
288
'id': range(1000000),
289
'value': np.random.randn(1000000),
290
'category': np.random.choice(['A', 'B', 'C'], 1000000)
291
})
292
293
# Create Table and flush to disk to save memory
294
large_table = cdf.Table(dataframe=large_df, use_memfd=True)
295
large_table.flush_to_disk() # DataFrame is now stored as temp Parquet file
296
297
# Query the data (loads only needed portions)
298
summary = cdf.query(
299
sql="""
300
SELECT
301
category,
302
COUNT(*) as count,
303
AVG(value) as avg_value,
304
STDDEV(value) as std_value
305
FROM __table__
306
GROUP BY category
307
ORDER BY category
308
""",
309
table=large_table
310
)
311
312
print("Large dataset summary:")
313
print(summary.to_pandas())
314
```
315
316
### Mixed Data Sources
317
318
```python
319
import chdb.dataframe as cdf
320
import pandas as pd
321
322
# Combine DataFrame, Parquet file, and Arrow table
323
df_sales = pd.DataFrame({
324
'product_id': [1, 2, 3],
325
'sales_q1': [1000, 1500, 800]
326
})
327
328
# Assuming we have Arrow table from another source
329
# arrow_inventory = pa.Table.from_pydict({
330
# 'product_id': [1, 2, 3, 4],
331
# 'inventory': [50, 75, 30, 100]
332
# })
333
334
parquet_products = cdf.Table(parquet_path="products.parquet")
335
336
# Join across different data sources
337
comprehensive_report = cdf.query(
338
sql="""
339
SELECT
340
p.product_name,
341
s.sales_q1,
342
p.category,
343
p.price
344
FROM __products__ p
345
JOIN __sales__ s ON p.product_id = s.product_id
346
WHERE s.sales_q1 > 900
347
ORDER BY s.sales_q1 DESC
348
""",
349
products=parquet_products,
350
sales=df_sales
351
)
352
353
print("Comprehensive product report:")
354
print(comprehensive_report.to_pandas())
355
```
356
357
### Using sql() Alias
358
359
```python
360
import chdb.dataframe as cdf
361
import pandas as pd
362
363
df = pd.DataFrame({
364
'name': ['Alice', 'Bob', 'Charlie'],
365
'score': [85, 92, 78]
366
})
367
368
# sql() works identically to query()
369
high_scores = cdf.sql(
370
"SELECT name, score FROM __tbl1__ WHERE score > 80",
371
tbl1=df
372
)
373
374
print(high_scores.to_pandas())
375
```
376
377
### Error Handling
378
379
```python
380
import chdb.dataframe as cdf
381
import pandas as pd
382
from chdb import ChdbError
383
384
df = pd.DataFrame({'a': [1, 2, 3]})
385
386
try:
387
# This will fail due to non-existent column
388
result = cdf.query(
389
"SELECT nonexistent_column FROM __tbl1__",
390
tbl1=df
391
)
392
except ChdbError as e:
393
print(f"Query failed: {e}")
394
395
try:
396
# This will fail due to missing table reference
397
result = cdf.query("SELECT * FROM missing_table")
398
except ChdbError as e:
399
print(f"Table reference error: {e}")
400
```