0
# Query Operations
1
2
SQL querying capabilities using Apache DataFusion integration for running analytical queries on Delta tables with full SQL support and high performance.
3
4
## Capabilities
5
6
### QueryBuilder Class
7
8
```python { .api }
9
class QueryBuilder:
10
def __init__(self) -> None: ...
11
12
def register(self, table_name: str, delta_table: DeltaTable) -> QueryBuilder: ...
13
14
def execute(self, sql: str) -> RecordBatchReader: ...
15
```
16
17
SQL query engine for Delta tables using Apache DataFusion.
18
19
### Query Results
20
21
```python { .api }
22
# RecordBatchReader methods for processing results
23
class RecordBatchReader:
24
def read_next_batch(self) -> RecordBatch | None: ...
25
26
def read_all(self) -> list[RecordBatch]: ...
27
28
def schema(self) -> ArrowSchema: ...
29
30
def __iter__(self) -> Iterator[RecordBatch]: ...
31
```
32
33
Streaming interface for query results.
34
35
## Usage Examples
36
37
### Basic Query Operations
38
39
```python
40
from deltalake import DeltaTable, QueryBuilder
41
42
# Load tables
43
customers_table = DeltaTable("path/to/customers")
44
orders_table = DeltaTable("path/to/orders")
45
46
# Create query builder
47
qb = QueryBuilder()
48
49
# Register tables for querying
50
qb.register("customers", customers_table)
51
qb.register("orders", orders_table)
52
53
# Execute simple query
54
result = qb.execute("SELECT * FROM customers WHERE age > 25")
55
56
# Process results
57
for batch in result:
58
df = batch.to_pandas()
59
print(f"Batch with {len(df)} rows:")
60
print(df.head())
61
```
62
63
### Complex SQL Queries
64
65
```python
66
# Join query
67
join_sql = """
68
SELECT
69
c.customer_id,
70
c.name,
71
c.email,
72
COUNT(o.order_id) as order_count,
73
SUM(o.total_amount) as total_spent
74
FROM customers c
75
LEFT JOIN orders o ON c.customer_id = o.customer_id
76
WHERE c.registration_date >= '2023-01-01'
77
GROUP BY c.customer_id, c.name, c.email
78
HAVING COUNT(o.order_id) > 0
79
ORDER BY total_spent DESC
80
LIMIT 100
81
"""
82
83
result = qb.execute(join_sql)
84
85
# Convert all results to pandas
86
all_batches = result.read_all()
87
combined_df = pd.concat([batch.to_pandas() for batch in all_batches], ignore_index=True)
88
print(f"Top customers by spending:")
89
print(combined_df.head(10))
90
```
91
92
### Aggregation Queries
93
94
```python
95
# Monthly sales aggregation
96
monthly_sales_sql = """
97
SELECT
98
DATE_TRUNC('month', order_date) as month,
99
COUNT(*) as order_count,
100
SUM(total_amount) as total_revenue,
101
AVG(total_amount) as avg_order_value,
102
COUNT(DISTINCT customer_id) as unique_customers
103
FROM orders
104
WHERE order_date >= '2023-01-01'
105
GROUP BY DATE_TRUNC('month', order_date)
106
ORDER BY month
107
"""
108
109
result = qb.execute(monthly_sales_sql)
110
monthly_df = pd.concat([batch.to_pandas() for batch in result], ignore_index=True)
111
112
print("Monthly sales summary:")
113
print(monthly_df)
114
```
115
116
### Window Functions
117
118
```python
119
# Ranking and window functions
120
ranking_sql = """
121
SELECT
122
customer_id,
123
order_date,
124
total_amount,
125
ROW_NUMBER() OVER (PARTITION BY customer_id ORDER BY order_date DESC) as order_rank,
126
SUM(total_amount) OVER (PARTITION BY customer_id) as customer_total,
127
LAG(total_amount, 1) OVER (PARTITION BY customer_id ORDER BY order_date) as previous_order_amount
128
FROM orders
129
WHERE order_date >= '2023-01-01'
130
ORDER BY customer_id, order_date DESC
131
"""
132
133
result = qb.execute(ranking_sql)
134
ranking_df = pd.concat([batch.to_pandas() for batch in result], ignore_index=True)
135
136
print("Customer order analysis:")
137
print(ranking_df.head(20))
138
```
139
140
### Time Series Analysis
141
142
```python
143
# Daily sales trend
144
daily_trend_sql = """
145
SELECT
146
order_date,
147
COUNT(*) as orders,
148
SUM(total_amount) as revenue,
149
AVG(total_amount) as avg_order_value,
150
SUM(COUNT(*)) OVER (ORDER BY order_date ROWS UNBOUNDED PRECEDING) as cumulative_orders,
151
SUM(SUM(total_amount)) OVER (ORDER BY order_date ROWS UNBOUNDED PRECEDING) as cumulative_revenue
152
FROM orders
153
WHERE order_date BETWEEN '2023-01-01' AND '2023-12-31'
154
GROUP BY order_date
155
ORDER BY order_date
156
"""
157
158
result = qb.execute(daily_trend_sql)
159
trend_df = pd.concat([batch.to_pandas() for batch in result], ignore_index=True)
160
161
print("Daily sales trend:")
162
print(trend_df.head())
163
```
164
165
### Subqueries and CTEs
166
167
```python
168
# Common Table Expression (CTE) example
169
cte_sql = """
170
WITH customer_stats AS (
171
SELECT
172
customer_id,
173
COUNT(*) as order_count,
174
SUM(total_amount) as total_spent,
175
AVG(total_amount) as avg_order_value,
176
MIN(order_date) as first_order_date,
177
MAX(order_date) as last_order_date
178
FROM orders
179
GROUP BY customer_id
180
),
181
customer_segments AS (
182
SELECT
183
customer_id,
184
order_count,
185
total_spent,
186
CASE
187
WHEN total_spent >= 1000 THEN 'High Value'
188
WHEN total_spent >= 500 THEN 'Medium Value'
189
ELSE 'Low Value'
190
END as customer_segment
191
FROM customer_stats
192
)
193
SELECT
194
cs.customer_segment,
195
COUNT(*) as customer_count,
196
AVG(cs.total_spent) as avg_total_spent,
197
AVG(cs.order_count) as avg_order_count
198
FROM customer_segments cs
199
GROUP BY cs.customer_segment
200
ORDER BY avg_total_spent DESC
201
"""
202
203
result = qb.execute(cte_sql)
204
segments_df = pd.concat([batch.to_pandas() for batch in result], ignore_index=True)
205
206
print("Customer segmentation:")
207
print(segments_df)
208
```
209
210
### Working with Multiple Tables
211
212
```python
213
# Register additional tables
214
products_table = DeltaTable("path/to/products")
215
order_items_table = DeltaTable("path/to/order_items")
216
217
qb.register("products", products_table)
218
qb.register("order_items", order_items_table)
219
220
# Complex multi-table query
221
multi_table_sql = """
222
SELECT
223
p.category,
224
p.product_name,
225
SUM(oi.quantity) as total_quantity_sold,
226
SUM(oi.quantity * oi.unit_price) as total_revenue,
227
COUNT(DISTINCT o.customer_id) as unique_customers,
228
AVG(oi.unit_price) as avg_unit_price
229
FROM products p
230
JOIN order_items oi ON p.product_id = oi.product_id
231
JOIN orders o ON oi.order_id = o.order_id
232
WHERE o.order_date >= '2023-01-01'
233
GROUP BY p.category, p.product_name
234
HAVING SUM(oi.quantity) >= 10
235
ORDER BY total_revenue DESC
236
LIMIT 50
237
"""
238
239
result = qb.execute(multi_table_sql)
240
products_df = pd.concat([batch.to_pandas() for batch in result], ignore_index=True)
241
242
print("Top products by revenue:")
243
print(products_df.head())
244
```
245
246
### Streaming Query Processing
247
248
```python
249
def process_large_query_in_batches(sql: str, batch_processor=None):
250
"""Process large query results in batches to manage memory"""
251
result = qb.execute(sql)
252
253
total_rows = 0
254
batch_count = 0
255
256
for batch in result:
257
batch_count += 1
258
batch_rows = batch.num_rows
259
total_rows += batch_rows
260
261
print(f"Processing batch {batch_count} with {batch_rows} rows")
262
263
if batch_processor:
264
# Custom processing function
265
batch_processor(batch)
266
else:
267
# Default: convert to pandas and show summary
268
df = batch.to_pandas()
269
print(f" Batch summary: {df.describe()}")
270
271
print(f"Processed {total_rows} total rows in {batch_count} batches")
272
273
# Example batch processor
274
def save_batch_to_csv(batch):
275
df = batch.to_pandas()
276
filename = f"output_batch_{hash(str(batch))}.csv"
277
df.to_csv(filename, index=False)
278
print(f" Saved batch to {filename}")
279
280
# Use streaming processing
281
large_query_sql = "SELECT * FROM orders WHERE order_date >= '2020-01-01'"
282
process_large_query_in_batches(large_query_sql, save_batch_to_csv)
283
```
284
285
### Query Performance Optimization
286
287
```python
288
# Performance tips and optimized queries
289
290
# 1. Use column selection to reduce data transfer
291
optimized_sql = """
292
SELECT customer_id, total_amount, order_date
293
FROM orders
294
WHERE order_date >= '2023-01-01'
295
"""
296
297
# 2. Use partition pruning when possible
298
partition_pruned_sql = """
299
SELECT * FROM orders
300
WHERE year = '2023' AND month = '01' -- Assuming partitioned by year/month
301
"""
302
303
# 3. Use LIMIT for exploration
304
exploration_sql = """
305
SELECT * FROM customers
306
ORDER BY registration_date DESC
307
LIMIT 1000
308
"""
309
310
# 4. Use appropriate data types in filters
311
typed_filter_sql = """
312
SELECT * FROM orders
313
WHERE total_amount > CAST(100.0 AS DOUBLE)
314
AND order_date >= DATE '2023-01-01'
315
"""
316
317
# Execute optimized queries
318
for description, sql in [
319
("Optimized column selection", optimized_sql),
320
("Partition pruning", partition_pruned_sql),
321
("Limited exploration", exploration_sql),
322
("Typed filters", typed_filter_sql)
323
]:
324
print(f"\n{description}:")
325
result = qb.execute(sql)
326
first_batch = result.read_next_batch()
327
if first_batch:
328
print(f" Returned {first_batch.num_rows} rows")
329
```
330
331
### Error Handling and Debugging
332
333
```python
334
def safe_query_execution(sql: str):
335
"""Execute query with proper error handling"""
336
try:
337
print(f"Executing query: {sql[:100]}...")
338
result = qb.execute(sql)
339
340
# Get schema information
341
schema_info = result.schema()
342
print(f"Result schema: {schema_info}")
343
344
# Process results
345
row_count = 0
346
for batch in result:
347
row_count += batch.num_rows
348
349
print(f"Query completed successfully. Total rows: {row_count}")
350
return row_count
351
352
except Exception as e:
353
print(f"Query failed: {e}")
354
print(f"Query: {sql}")
355
return None
356
357
# Test queries with error handling
358
test_queries = [
359
"SELECT COUNT(*) FROM customers",
360
"SELECT * FROM non_existent_table", # This will fail
361
"SELECT invalid_column FROM orders", # This will fail
362
"SELECT customer_id, COUNT(*) FROM orders GROUP BY customer_id LIMIT 10"
363
]
364
365
for query in test_queries:
366
safe_query_execution(query)
367
print("-" * 50)
368
```