CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-deltalake

Native Delta Lake Python binding based on delta-rs with Pandas integration

Pending
Overview
Eval results
Files

query-operations.mddocs/

Query Operations

SQL querying capabilities using Apache DataFusion integration for running analytical queries on Delta tables with full SQL support and high performance.

Capabilities

QueryBuilder Class

class QueryBuilder:
    def __init__(self) -> None: ...
    
    def register(self, table_name: str, delta_table: DeltaTable) -> QueryBuilder: ...
    
    def execute(self, sql: str) -> RecordBatchReader: ...

SQL query engine for Delta tables using Apache DataFusion.

Query Results

# RecordBatchReader methods for processing results
class RecordBatchReader:
    def read_next_batch(self) -> RecordBatch | None: ...
    
    def read_all(self) -> list[RecordBatch]: ...
    
    def schema(self) -> ArrowSchema: ...
    
    def __iter__(self) -> Iterator[RecordBatch]: ...

Streaming interface for query results.

Usage Examples

Basic Query Operations

from deltalake import DeltaTable, QueryBuilder

# Load tables
customers_table = DeltaTable("path/to/customers")
orders_table = DeltaTable("path/to/orders")

# Create query builder
qb = QueryBuilder()

# Register tables for querying
qb.register("customers", customers_table)
qb.register("orders", orders_table)

# Execute simple query
result = qb.execute("SELECT * FROM customers WHERE age > 25")

# Process results
for batch in result:
    df = batch.to_pandas()
    print(f"Batch with {len(df)} rows:")
    print(df.head())

Complex SQL Queries

# Join query
join_sql = """
SELECT 
    c.customer_id,
    c.name,
    c.email,
    COUNT(o.order_id) as order_count,
    SUM(o.total_amount) as total_spent
FROM customers c
LEFT JOIN orders o ON c.customer_id = o.customer_id
WHERE c.registration_date >= '2023-01-01'
GROUP BY c.customer_id, c.name, c.email
HAVING COUNT(o.order_id) > 0
ORDER BY total_spent DESC
LIMIT 100
"""

result = qb.execute(join_sql)

# Convert all results to pandas
all_batches = result.read_all()
combined_df = pd.concat([batch.to_pandas() for batch in all_batches], ignore_index=True)
print(f"Top customers by spending:")
print(combined_df.head(10))

Aggregation Queries

# Monthly sales aggregation
monthly_sales_sql = """
SELECT 
    DATE_TRUNC('month', order_date) as month,
    COUNT(*) as order_count,
    SUM(total_amount) as total_revenue,
    AVG(total_amount) as avg_order_value,
    COUNT(DISTINCT customer_id) as unique_customers
FROM orders
WHERE order_date >= '2023-01-01'
GROUP BY DATE_TRUNC('month', order_date)
ORDER BY month
"""

result = qb.execute(monthly_sales_sql)
monthly_df = pd.concat([batch.to_pandas() for batch in result], ignore_index=True)

print("Monthly sales summary:")
print(monthly_df)

Window Functions

# Ranking and window functions
ranking_sql = """
SELECT 
    customer_id,
    order_date,
    total_amount,
    ROW_NUMBER() OVER (PARTITION BY customer_id ORDER BY order_date DESC) as order_rank,
    SUM(total_amount) OVER (PARTITION BY customer_id) as customer_total,
    LAG(total_amount, 1) OVER (PARTITION BY customer_id ORDER BY order_date) as previous_order_amount
FROM orders
WHERE order_date >= '2023-01-01'
ORDER BY customer_id, order_date DESC
"""

result = qb.execute(ranking_sql)
ranking_df = pd.concat([batch.to_pandas() for batch in result], ignore_index=True)

print("Customer order analysis:")
print(ranking_df.head(20))

Time Series Analysis

# Daily sales trend
daily_trend_sql = """
SELECT 
    order_date,
    COUNT(*) as orders,
    SUM(total_amount) as revenue,
    AVG(total_amount) as avg_order_value,
    SUM(COUNT(*)) OVER (ORDER BY order_date ROWS UNBOUNDED PRECEDING) as cumulative_orders,
    SUM(SUM(total_amount)) OVER (ORDER BY order_date ROWS UNBOUNDED PRECEDING) as cumulative_revenue
FROM orders
WHERE order_date BETWEEN '2023-01-01' AND '2023-12-31'
GROUP BY order_date
ORDER BY order_date
"""

result = qb.execute(daily_trend_sql)
trend_df = pd.concat([batch.to_pandas() for batch in result], ignore_index=True)

print("Daily sales trend:")
print(trend_df.head())

Subqueries and CTEs

# Common Table Expression (CTE) example
cte_sql = """
WITH customer_stats AS (
    SELECT 
        customer_id,
        COUNT(*) as order_count,
        SUM(total_amount) as total_spent,
        AVG(total_amount) as avg_order_value,
        MIN(order_date) as first_order_date,
        MAX(order_date) as last_order_date
    FROM orders
    GROUP BY customer_id
),
customer_segments AS (
    SELECT 
        customer_id,
        order_count,
        total_spent,
        CASE 
            WHEN total_spent >= 1000 THEN 'High Value'
            WHEN total_spent >= 500 THEN 'Medium Value'
            ELSE 'Low Value'
        END as customer_segment
    FROM customer_stats
)
SELECT 
    cs.customer_segment,
    COUNT(*) as customer_count,
    AVG(cs.total_spent) as avg_total_spent,
    AVG(cs.order_count) as avg_order_count
FROM customer_segments cs
GROUP BY cs.customer_segment
ORDER BY avg_total_spent DESC
"""

result = qb.execute(cte_sql)
segments_df = pd.concat([batch.to_pandas() for batch in result], ignore_index=True)

print("Customer segmentation:")
print(segments_df)

Working with Multiple Tables

# Register additional tables
products_table = DeltaTable("path/to/products")
order_items_table = DeltaTable("path/to/order_items")

qb.register("products", products_table)
qb.register("order_items", order_items_table)

# Complex multi-table query
multi_table_sql = """
SELECT 
    p.category,
    p.product_name,
    SUM(oi.quantity) as total_quantity_sold,
    SUM(oi.quantity * oi.unit_price) as total_revenue,
    COUNT(DISTINCT o.customer_id) as unique_customers,
    AVG(oi.unit_price) as avg_unit_price
FROM products p
JOIN order_items oi ON p.product_id = oi.product_id
JOIN orders o ON oi.order_id = o.order_id
WHERE o.order_date >= '2023-01-01'
GROUP BY p.category, p.product_name
HAVING SUM(oi.quantity) >= 10
ORDER BY total_revenue DESC
LIMIT 50
"""

result = qb.execute(multi_table_sql)
products_df = pd.concat([batch.to_pandas() for batch in result], ignore_index=True)

print("Top products by revenue:")
print(products_df.head())

Streaming Query Processing

def process_large_query_in_batches(sql: str, batch_processor=None):
    """Process large query results in batches to manage memory"""
    result = qb.execute(sql)
    
    total_rows = 0
    batch_count = 0
    
    for batch in result:
        batch_count += 1
        batch_rows = batch.num_rows
        total_rows += batch_rows
        
        print(f"Processing batch {batch_count} with {batch_rows} rows")
        
        if batch_processor:
            # Custom processing function
            batch_processor(batch)
        else:
            # Default: convert to pandas and show summary
            df = batch.to_pandas()
            print(f"  Batch summary: {df.describe()}")
    
    print(f"Processed {total_rows} total rows in {batch_count} batches")

# Example batch processor
def save_batch_to_csv(batch):
    df = batch.to_pandas()
    filename = f"output_batch_{hash(str(batch))}.csv"
    df.to_csv(filename, index=False)
    print(f"  Saved batch to {filename}")

# Use streaming processing
large_query_sql = "SELECT * FROM orders WHERE order_date >= '2020-01-01'"
process_large_query_in_batches(large_query_sql, save_batch_to_csv)

Query Performance Optimization

# Performance tips and optimized queries

# 1. Use column selection to reduce data transfer
optimized_sql = """
SELECT customer_id, total_amount, order_date
FROM orders 
WHERE order_date >= '2023-01-01'
"""

# 2. Use partition pruning when possible
partition_pruned_sql = """
SELECT * FROM orders
WHERE year = '2023' AND month = '01'  -- Assuming partitioned by year/month
"""

# 3. Use LIMIT for exploration
exploration_sql = """
SELECT * FROM customers
ORDER BY registration_date DESC
LIMIT 1000
"""

# 4. Use appropriate data types in filters
typed_filter_sql = """
SELECT * FROM orders
WHERE total_amount > CAST(100.0 AS DOUBLE)
AND order_date >= DATE '2023-01-01'
"""

# Execute optimized queries
for description, sql in [
    ("Optimized column selection", optimized_sql),
    ("Partition pruning", partition_pruned_sql),
    ("Limited exploration", exploration_sql),
    ("Typed filters", typed_filter_sql)
]:
    print(f"\n{description}:")
    result = qb.execute(sql)
    first_batch = result.read_next_batch()
    if first_batch:
        print(f"  Returned {first_batch.num_rows} rows")

Error Handling and Debugging

def safe_query_execution(sql: str):
    """Execute query with proper error handling"""
    try:
        print(f"Executing query: {sql[:100]}...")
        result = qb.execute(sql)
        
        # Get schema information
        schema_info = result.schema()
        print(f"Result schema: {schema_info}")
        
        # Process results
        row_count = 0
        for batch in result:
            row_count += batch.num_rows
        
        print(f"Query completed successfully. Total rows: {row_count}")
        return row_count
        
    except Exception as e:
        print(f"Query failed: {e}")
        print(f"Query: {sql}")
        return None

# Test queries with error handling
test_queries = [
    "SELECT COUNT(*) FROM customers",
    "SELECT * FROM non_existent_table",  # This will fail
    "SELECT invalid_column FROM orders",  # This will fail
    "SELECT customer_id, COUNT(*) FROM orders GROUP BY customer_id LIMIT 10"
]

for query in test_queries:
    safe_query_execution(query)
    print("-" * 50)

Install with Tessl CLI

npx tessl i tessl/pypi-deltalake

docs

data-reading.md

index.md

query-operations.md

schema-management.md

table-maintenance.md

table-operations.md

transaction-management.md

writing-modification.md

tile.json