Distributed Dataframes for Multimodal Data with high-performance query engine and support for complex nested data structures, AI/ML operations, and seamless cloud storage integration.
—
Execute SQL queries directly on DataFrames and registered tables with full support for standard SQL syntax, joins, aggregations, and Daft-specific extensions for multimodal data operations.
Execute SQL queries that return DataFrames.
def sql(query: str) -> DataFrame:
"""
Execute SQL query and return DataFrame.
Parameters:
- query: SQL query string with standard SQL syntax
Returns:
DataFrame: Result of SQL query
Examples:
>>> df = daft.sql("SELECT name, age FROM my_table WHERE age > 25")
>>> result = daft.sql("SELECT department, AVG(salary) FROM employees GROUP BY department")
"""Create SQL expressions for use in DataFrame operations.
def sql_expr(expression: str) -> Expression:
"""
Create expression from SQL string.
Parameters:
- expression: SQL expression string
Returns:
Expression: Expression object for DataFrame operations
Examples:
>>> expr = daft.sql_expr("age * 2 + 1")
>>> df.select(sql_expr("UPPER(name)").alias("upper_name"))
"""import daft
# Create sample data
df = daft.from_pydict({
"name": ["Alice", "Bob", "Charlie", "Diana"],
"age": [25, 30, 35, 28],
"department": ["Engineering", "Sales", "Engineering", "Marketing"],
"salary": [75000, 65000, 85000, 70000]
})
# Register DataFrame for SQL queries
daft.attach_table(df, "employees")
# Simple SELECT query
result1 = daft.sql("SELECT name, age FROM employees WHERE age > 27")
# Aggregation query
result2 = daft.sql("""
SELECT department,
COUNT(*) as employee_count,
AVG(salary) as avg_salary,
MAX(age) as max_age
FROM employees
GROUP BY department
ORDER BY avg_salary DESC
""")# Complex JOIN operations
orders = daft.from_pydict({
"order_id": [1, 2, 3, 4],
"employee_name": ["Alice", "Bob", "Alice", "Charlie"],
"amount": [1500, 2000, 1200, 1800],
"order_date": ["2024-01-15", "2024-01-16", "2024-01-17", "2024-01-18"]
})
daft.attach_table(orders, "orders")
# JOIN with aggregation
sales_summary = daft.sql("""
SELECT e.name,
e.department,
COUNT(o.order_id) as total_orders,
SUM(o.amount) as total_sales,
AVG(o.amount) as avg_order_value
FROM employees e
LEFT JOIN orders o ON e.name = o.employee_name
GROUP BY e.name, e.department
HAVING COUNT(o.order_id) > 0
ORDER BY total_sales DESC
""")# Window functions for ranking and running totals
windowed_results = daft.sql("""
SELECT name,
department,
salary,
ROW_NUMBER() OVER (PARTITION BY department ORDER BY salary DESC) as dept_rank,
RANK() OVER (ORDER BY salary DESC) as overall_rank,
SUM(salary) OVER (PARTITION BY department) as dept_total_salary,
AVG(salary) OVER () as company_avg_salary
FROM employees
ORDER BY department, salary DESC
""")# Using WITH clauses for complex queries
cte_query = daft.sql("""
WITH department_stats AS (
SELECT department,
COUNT(*) as emp_count,
AVG(salary) as avg_salary,
MIN(age) as min_age,
MAX(age) as max_age
FROM employees
GROUP BY department
),
high_performing_depts AS (
SELECT department
FROM department_stats
WHERE avg_salary > 70000 AND emp_count >= 2
)
SELECT e.name, e.age, e.salary, e.department
FROM employees e
INNER JOIN high_performing_depts h ON e.department = h.department
ORDER BY e.salary DESC
""")# Subqueries for complex filtering
subquery_result = daft.sql("""
SELECT name, age, salary, department,
salary - (SELECT AVG(salary) FROM employees) as salary_diff
FROM employees
WHERE salary > (
SELECT AVG(salary)
FROM employees
WHERE department = 'Engineering'
)
ORDER BY salary_diff DESC
""")# Combine SQL queries with DataFrame methods
sql_df = daft.sql("SELECT * FROM employees WHERE age BETWEEN 25 AND 35")
# Continue with DataFrame operations
final_result = (sql_df
.filter(daft.col("salary") > 70000)
.select("name", "department", (daft.col("salary") * 1.1).alias("with_bonus"))
.collect()
)# SQL expressions in DataFrame select
df_with_sql_expr = df.select(
daft.col("name"),
daft.sql_expr("age * 2").alias("double_age"),
daft.sql_expr("UPPER(department)").alias("dept_upper"),
daft.sql_expr("CASE WHEN salary > 70000 THEN 'High' ELSE 'Standard' END").alias("salary_tier")
)
# SQL expressions in filters
filtered_df = df.filter(
daft.sql_expr("age BETWEEN 25 AND 35 AND department IN ('Engineering', 'Sales')")
)# Register multiple tables
customers = daft.from_pydict({
"customer_id": [1, 2, 3],
"name": ["Corp A", "Corp B", "Corp C"],
"industry": ["Tech", "Finance", "Healthcare"]
})
daft.attach_table(customers, "customers")
# Multi-table JOIN
multi_table_query = daft.sql("""
SELECT c.name as customer_name,
c.industry,
e.name as employee_name,
e.department,
o.amount,
o.order_date
FROM customers c
JOIN orders o ON c.customer_id = o.order_id -- Simplified relationship
JOIN employees e ON o.employee_name = e.name
WHERE c.industry = 'Tech'
ORDER BY o.order_date DESC
""")# Working with different data types
mixed_data = daft.from_pydict({
"id": [1, 2, 3],
"created_at": ["2024-01-01 10:00:00", "2024-01-02 15:30:00", "2024-01-03 09:15:00"],
"tags": [["python", "sql"], ["data", "analytics"], ["machine", "learning"]],
"metadata": ['{"priority": 1}', '{"priority": 2}', '{"priority": 3}']
})
daft.attach_table(mixed_data, "mixed_data")
# SQL with complex data types
complex_query = daft.sql("""
SELECT id,
EXTRACT(YEAR FROM created_at) as year,
EXTRACT(MONTH FROM created_at) as month,
ARRAY_LENGTH(tags) as tag_count,
JSON_EXTRACT(metadata, '$.priority') as priority
FROM mixed_data
WHERE ARRAY_LENGTH(tags) > 1
ORDER BY priority DESC
""")# Create temporary table from query result
temp_result = daft.sql("""
SELECT department, AVG(salary) as avg_salary
FROM employees
GROUP BY department
""")
daft.attach_table(temp_result, "dept_averages")
# Use temporary table in subsequent queries
comparison = daft.sql("""
SELECT e.name,
e.department,
e.salary,
d.avg_salary,
e.salary - d.avg_salary as salary_diff
FROM employees e
JOIN dept_averages d ON e.department = d.department
ORDER BY salary_diff DESC
""")try:
# Execute potentially problematic query
result = daft.sql("""
SELECT name, salary / 0 as invalid_calc
FROM employees
""")
except Exception as e:
print(f"SQL error: {e}")
# Validate table exists before querying
if daft.has_table("employees"):
result = daft.sql("SELECT COUNT(*) FROM employees")
else:
print("Table 'employees' not found")Daft's SQL interface supports:
def attach_table(df: DataFrame, name: str) -> None:
"""Register DataFrame as named table for SQL queries."""
def detach_table(name: str) -> None:
"""Remove registered table from SQL context."""
def has_table(name: str) -> bool:
"""Check if table is registered for SQL queries."""
def list_tables() -> List[str]:
"""List all registered tables."""SQL queries can reference any registered table by name, enabling complex multi-table operations and reusable query patterns.
Install with Tessl CLI
npx tessl i tessl/pypi-daft