Native Delta Lake Python binding based on delta-rs with Pandas integration
—
SQL querying capabilities using Apache DataFusion integration for running analytical queries on Delta tables with full SQL support and high performance.
class QueryBuilder:
def __init__(self) -> None: ...
def register(self, table_name: str, delta_table: DeltaTable) -> QueryBuilder: ...
def execute(self, sql: str) -> RecordBatchReader: ...SQL query engine for Delta tables using Apache DataFusion.
# RecordBatchReader methods for processing results
class RecordBatchReader:
def read_next_batch(self) -> RecordBatch | None: ...
def read_all(self) -> list[RecordBatch]: ...
def schema(self) -> ArrowSchema: ...
def __iter__(self) -> Iterator[RecordBatch]: ...Streaming interface for query results.
from deltalake import DeltaTable, QueryBuilder
# Load tables
customers_table = DeltaTable("path/to/customers")
orders_table = DeltaTable("path/to/orders")
# Create query builder
qb = QueryBuilder()
# Register tables for querying
qb.register("customers", customers_table)
qb.register("orders", orders_table)
# Execute simple query
result = qb.execute("SELECT * FROM customers WHERE age > 25")
# Process results
for batch in result:
df = batch.to_pandas()
print(f"Batch with {len(df)} rows:")
print(df.head())# Join query
join_sql = """
SELECT
c.customer_id,
c.name,
c.email,
COUNT(o.order_id) as order_count,
SUM(o.total_amount) as total_spent
FROM customers c
LEFT JOIN orders o ON c.customer_id = o.customer_id
WHERE c.registration_date >= '2023-01-01'
GROUP BY c.customer_id, c.name, c.email
HAVING COUNT(o.order_id) > 0
ORDER BY total_spent DESC
LIMIT 100
"""
result = qb.execute(join_sql)
# Convert all results to pandas
all_batches = result.read_all()
combined_df = pd.concat([batch.to_pandas() for batch in all_batches], ignore_index=True)
print(f"Top customers by spending:")
print(combined_df.head(10))# Monthly sales aggregation
monthly_sales_sql = """
SELECT
DATE_TRUNC('month', order_date) as month,
COUNT(*) as order_count,
SUM(total_amount) as total_revenue,
AVG(total_amount) as avg_order_value,
COUNT(DISTINCT customer_id) as unique_customers
FROM orders
WHERE order_date >= '2023-01-01'
GROUP BY DATE_TRUNC('month', order_date)
ORDER BY month
"""
result = qb.execute(monthly_sales_sql)
monthly_df = pd.concat([batch.to_pandas() for batch in result], ignore_index=True)
print("Monthly sales summary:")
print(monthly_df)# Ranking and window functions
ranking_sql = """
SELECT
customer_id,
order_date,
total_amount,
ROW_NUMBER() OVER (PARTITION BY customer_id ORDER BY order_date DESC) as order_rank,
SUM(total_amount) OVER (PARTITION BY customer_id) as customer_total,
LAG(total_amount, 1) OVER (PARTITION BY customer_id ORDER BY order_date) as previous_order_amount
FROM orders
WHERE order_date >= '2023-01-01'
ORDER BY customer_id, order_date DESC
"""
result = qb.execute(ranking_sql)
ranking_df = pd.concat([batch.to_pandas() for batch in result], ignore_index=True)
print("Customer order analysis:")
print(ranking_df.head(20))# Daily sales trend
daily_trend_sql = """
SELECT
order_date,
COUNT(*) as orders,
SUM(total_amount) as revenue,
AVG(total_amount) as avg_order_value,
SUM(COUNT(*)) OVER (ORDER BY order_date ROWS UNBOUNDED PRECEDING) as cumulative_orders,
SUM(SUM(total_amount)) OVER (ORDER BY order_date ROWS UNBOUNDED PRECEDING) as cumulative_revenue
FROM orders
WHERE order_date BETWEEN '2023-01-01' AND '2023-12-31'
GROUP BY order_date
ORDER BY order_date
"""
result = qb.execute(daily_trend_sql)
trend_df = pd.concat([batch.to_pandas() for batch in result], ignore_index=True)
print("Daily sales trend:")
print(trend_df.head())# Common Table Expression (CTE) example
cte_sql = """
WITH customer_stats AS (
SELECT
customer_id,
COUNT(*) as order_count,
SUM(total_amount) as total_spent,
AVG(total_amount) as avg_order_value,
MIN(order_date) as first_order_date,
MAX(order_date) as last_order_date
FROM orders
GROUP BY customer_id
),
customer_segments AS (
SELECT
customer_id,
order_count,
total_spent,
CASE
WHEN total_spent >= 1000 THEN 'High Value'
WHEN total_spent >= 500 THEN 'Medium Value'
ELSE 'Low Value'
END as customer_segment
FROM customer_stats
)
SELECT
cs.customer_segment,
COUNT(*) as customer_count,
AVG(cs.total_spent) as avg_total_spent,
AVG(cs.order_count) as avg_order_count
FROM customer_segments cs
GROUP BY cs.customer_segment
ORDER BY avg_total_spent DESC
"""
result = qb.execute(cte_sql)
segments_df = pd.concat([batch.to_pandas() for batch in result], ignore_index=True)
print("Customer segmentation:")
print(segments_df)# Register additional tables
products_table = DeltaTable("path/to/products")
order_items_table = DeltaTable("path/to/order_items")
qb.register("products", products_table)
qb.register("order_items", order_items_table)
# Complex multi-table query
multi_table_sql = """
SELECT
p.category,
p.product_name,
SUM(oi.quantity) as total_quantity_sold,
SUM(oi.quantity * oi.unit_price) as total_revenue,
COUNT(DISTINCT o.customer_id) as unique_customers,
AVG(oi.unit_price) as avg_unit_price
FROM products p
JOIN order_items oi ON p.product_id = oi.product_id
JOIN orders o ON oi.order_id = o.order_id
WHERE o.order_date >= '2023-01-01'
GROUP BY p.category, p.product_name
HAVING SUM(oi.quantity) >= 10
ORDER BY total_revenue DESC
LIMIT 50
"""
result = qb.execute(multi_table_sql)
products_df = pd.concat([batch.to_pandas() for batch in result], ignore_index=True)
print("Top products by revenue:")
print(products_df.head())def process_large_query_in_batches(sql: str, batch_processor=None):
"""Process large query results in batches to manage memory"""
result = qb.execute(sql)
total_rows = 0
batch_count = 0
for batch in result:
batch_count += 1
batch_rows = batch.num_rows
total_rows += batch_rows
print(f"Processing batch {batch_count} with {batch_rows} rows")
if batch_processor:
# Custom processing function
batch_processor(batch)
else:
# Default: convert to pandas and show summary
df = batch.to_pandas()
print(f" Batch summary: {df.describe()}")
print(f"Processed {total_rows} total rows in {batch_count} batches")
# Example batch processor
def save_batch_to_csv(batch):
df = batch.to_pandas()
filename = f"output_batch_{hash(str(batch))}.csv"
df.to_csv(filename, index=False)
print(f" Saved batch to {filename}")
# Use streaming processing
large_query_sql = "SELECT * FROM orders WHERE order_date >= '2020-01-01'"
process_large_query_in_batches(large_query_sql, save_batch_to_csv)# Performance tips and optimized queries
# 1. Use column selection to reduce data transfer
optimized_sql = """
SELECT customer_id, total_amount, order_date
FROM orders
WHERE order_date >= '2023-01-01'
"""
# 2. Use partition pruning when possible
partition_pruned_sql = """
SELECT * FROM orders
WHERE year = '2023' AND month = '01' -- Assuming partitioned by year/month
"""
# 3. Use LIMIT for exploration
exploration_sql = """
SELECT * FROM customers
ORDER BY registration_date DESC
LIMIT 1000
"""
# 4. Use appropriate data types in filters
typed_filter_sql = """
SELECT * FROM orders
WHERE total_amount > CAST(100.0 AS DOUBLE)
AND order_date >= DATE '2023-01-01'
"""
# Execute optimized queries
for description, sql in [
("Optimized column selection", optimized_sql),
("Partition pruning", partition_pruned_sql),
("Limited exploration", exploration_sql),
("Typed filters", typed_filter_sql)
]:
print(f"\n{description}:")
result = qb.execute(sql)
first_batch = result.read_next_batch()
if first_batch:
print(f" Returned {first_batch.num_rows} rows")def safe_query_execution(sql: str):
"""Execute query with proper error handling"""
try:
print(f"Executing query: {sql[:100]}...")
result = qb.execute(sql)
# Get schema information
schema_info = result.schema()
print(f"Result schema: {schema_info}")
# Process results
row_count = 0
for batch in result:
row_count += batch.num_rows
print(f"Query completed successfully. Total rows: {row_count}")
return row_count
except Exception as e:
print(f"Query failed: {e}")
print(f"Query: {sql}")
return None
# Test queries with error handling
test_queries = [
"SELECT COUNT(*) FROM customers",
"SELECT * FROM non_existent_table", # This will fail
"SELECT invalid_column FROM orders", # This will fail
"SELECT customer_id, COUNT(*) FROM orders GROUP BY customer_id LIMIT 10"
]
for query in test_queries:
safe_query_execution(query)
print("-" * 50)Install with Tessl CLI
npx tessl i tessl/pypi-deltalake